📄 flex_bench_mysql.cpp
字号:
} if(returnValue == NDBT_OK){ sleepBeforeStartingTest(tSleepTime); /**************************************************************** * Create threads. * ****************************************************************/ resetThreads(pThreadsData); for (unsigned int i = 0; i < tNoOfThreads; i++){ pThreadsData[i].threadNo = i; pThreadsData[i].threadLife = NdbThread_Create(flexBenchThread, (void**)&pThreadsData[i], 32768, "flexBenchThread", NDB_THREAD_PRIO_LOW); } waitForThreads(pThreadsData); ndbout << endl << "All threads started" << endl << endl; /**************************************************************** * Execute program. * ****************************************************************/ for(;;){ int loopCount = tLoops + 1; ndbout << endl << "Loop # " << loopCount << endl << endl; /**************************************************************** * Perform inserts. * ****************************************************************/ // Reset and start timer START_TIMER; // Give insert-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stInsert); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing insert" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Verify inserts. * ****************************************************************/ if (VerifyFlag) { resetThreads(pThreadsData); ndbout << "Verifying inserts...\t" ; tellThreads(pThreadsData, stVerify); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed while verifying inserts" << endl; returnValue = NDBT_FAILED; break; }else{ ndbout << "\t\tOK" << endl << endl ; } } /**************************************************************** * Perform read. * ****************************************************************/ // Reset and start timer START_TIMER; // Give read-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stRead); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing read" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Perform update. * ****************************************************************/ // Reset and start timer START_TIMER; // Give update-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stUpdate); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing update" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Verify updates. * ****************************************************************/ if (VerifyFlag) { resetThreads(pThreadsData); ndbout << "Verifying updates...\t" ; tellThreads(pThreadsData, stVerify); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed while verifying updates" << endl; returnValue = NDBT_FAILED; break; }else{ ndbout << "\t\tOK" << endl << endl ; } } /**************************************************************** * Perform read. * ****************************************************************/ // Reset and start timer START_TIMER; // Give read-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stRead); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing read" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Perform delete. * ****************************************************************/ // Reset and start timer START_TIMER; // Give delete-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stDelete); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing delete" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Verify deletes. * ****************************************************************/ if (VerifyFlag) { resetThreads(pThreadsData); ndbout << "Verifying tuple deletion..." ; tellThreads(pThreadsData, stVerifyDelete); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in verifying deletes" << endl; returnValue = NDBT_FAILED; break; }else{ ndbout << "\t\tOK" << endl << endl ; } } ndbout << "--------------------------------------------------" << endl; tLoops++; if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops ) break; theErrorData.printErrorCounters(); } resetThreads(pThreadsData); tellThreads(pThreadsData, stStop); waitForThreads(pThreadsData); void * tmp; for(Uint32 i = 0; i<tNoOfThreads; i++){ NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp); NdbThread_Destroy(&pThreadsData[i].threadLife); } }#ifdef USE_MYSQL if (!use_ndb) { dropTables(&mysql); mysql_close(&mysql); }#endif if (use_ndb) { drop_instance(); } theErrorData.printErrorCounters(); return NDBT_ProgramExit(returnValue);}////////////////////////////////////////unsigned long get_hash(unsigned long * hash_key, int len){ unsigned long hash_value = 147; unsigned h_key; int i; for (i = 0; i < len; i++) { h_key = hash_key[i]; hash_value = (hash_value << 5) + hash_value + (h_key & 255); hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255); hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255); hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255); } return hash_value;}// End of warming up phasestatic void* flexBenchThread(void* pArg){ ThreadData* pThreadData = (ThreadData*)pArg; unsigned int threadNo, threadBase; Ndb* pNdb = NULL ; Uint32 ndb_id = 0; NdbConnection *pTrans = NULL ; NdbOperation** pOps = NULL ; StartType tType ; StartType tSaveType ; NdbRecAttr* tTmp = NULL ; int* attrValue = NULL ; int* attrRefValue = NULL ; int check = 0 ; int loopCountOps, loopCountTables, loopCountAttributes; int tAttemptNo = 0; int tRetryAttempts = 20; int tResult = 0; int tSpecialTrans = 0; int nRefLocalOpOffset = 0 ; int nReadBuffSize = tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ; int nRefBuffSize = tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ; unsigned*** longKeyAttrValue = NULL; threadNo = pThreadData->threadNo ;#ifdef USE_MYSQL MYSQL mysql; int the_socket = sockets[threadNo % n_sockets]; char the_socket_name[1024]; //sprintf(the_socket_name, "%s", "/tmp/mysql.sock"); sprintf(the_socket_name, "%s%u%s", "/tmp/mysql.",the_socket,".sock"); if (!use_ndb) { ndbout << the_socket_name << endl; ndbout << "Thread connecting to MySQL... " << endl; mysql_init(&mysql); if ( mysql_real_connect(&mysql, "localhost", "root", "", "test", the_socket, the_socket_name, 0) == NULL ) { ndbout << "failed" << endl; return 0; } mysql.reconnect= 1; ndbout << "ok" << endl; int r; if (tNoOfTables > 1) r = mysql_autocommit(&mysql, 0); else r = mysql_autocommit(&mysql, 1); if (r) { ndbout << "autocommit on/off failed" << endl; return 0; } }#endif NdbAutoPtr<int> p00( attrValue= (int*)malloc(nReadBuffSize) ) ; NdbAutoPtr<int> p01( attrRefValue= (int*)malloc(nRefBuffSize) ); if (use_ndb) { pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ; } NdbAutoPtr<NdbOperation*> p02( pOps ); if( !attrValue || !attrRefValue || ( use_ndb && ( !pOps) ) ){ // Check allocations to make sure we got all the memory we asked for ndbout << "One or more memory allocations failed when starting thread #"; ndbout << threadNo << endl ; ndbout << "Thread #" << threadNo << " will now exit" << endl ; tResult = 13 ; return 0; } if (use_ndb) { pNdb = get_ndb_object(ndb_id, "test", "def"); if (pNdb == NULL) { ndbout << "Failed to get an NDB object" << endl; ndbout << "Thread #" << threadNo << " will now exit" << endl ; tResult = 13; return 0; } pNdb->waitUntilReady(); return_ndb_object(pNdb, ndb_id); pNdb = NULL; } // To make sure that two different threads doesn't operate on the same record // Calculate an "unique" number to use as primary key threadBase = (threadNo * 2000000) + (tNodeId * 260000000); NdbAutoPtr<char> p22; if(useLongKeys){ // Allocate and populate the longkey array. int e1 = sizeof(unsigned**) * tNoOfOperations; int e2 = sizeof(unsigned*) * tNoOfLongPK * tNoOfOperations; int e3 = sizeof(unsigned) * tSizeOfLongPK * tNoOfLongPK * tNoOfOperations; char* tmp; p22.reset(tmp = (char*)malloc(e1+e2+e3)); longKeyAttrValue = (unsigned ***) tmp; tmp += e1; for (Uint32 n = 0; n < tNoOfOperations; n++) { longKeyAttrValue[n] = (unsigned **) tmp; tmp += sizeof(unsigned*) * tNoOfLongPK; } for (Uint32 n = 0; n < tNoOfOperations; n++){ for (Uint32 i = 0; i < tNoOfLongPK ; i++) { longKeyAttrValue[n][i] = (unsigned *) tmp; tmp += sizeof(unsigned) * tSizeOfLongPK; memset(longKeyAttrValue[n][i], 0, sizeof(unsigned) * tSizeOfLongPK); for(Uint32 j = 0; j < tSizeOfLongPK; j++) { // Repeat the unique value to fill up the long key. longKeyAttrValue[n][i][j] = threadBase + n; } } } } int nRefOpOffset = 0 ; //Assign reference attribute values to memory for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){ // Calculate offset value before going into the next loop nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ; for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){ *(int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] = (int)(threadBase + ops + a) ; } }#ifdef CEBIT_STAT // ops not yet reported int statOps = 0;#endif#ifdef USE_MYSQL // temporary buffer to store prepared statement text char buf[2048]; MYSQL_STMT** prep_read = NULL; MYSQL_STMT** prep_delete = NULL; MYSQL_STMT** prep_update = NULL; MYSQL_STMT** prep_insert = NULL; MYSQL_BIND* bind_delete = NULL; MYSQL_BIND* bind_read = NULL; MYSQL_BIND* bind_update = NULL; MYSQL_BIND* bind_insert = NULL; int* mysql_data = NULL; NdbAutoPtr<char> p21; if (!use_ndb) { // data array to which prepared statements are bound char* tmp; int e1 = sizeof(int)*tAttributeSize*tNoOfAttributes; int e2 = sizeof(MYSQL_BIND)*tNoOfAttributes; int e3 = sizeof(MYSQL_BIND)*tNoOfAttributes; int e4 = sizeof(MYSQL_BIND)*tNoOfAttributes; int e5 = sizeof(MYSQL_BIND)*1; int e6 = sizeof(MYSQL_STMT*)*tNoOfTables; int e7 = sizeof(MYSQL_STMT*)*tNoOfTables; int e8 = sizeof(MYSQL_STMT*)*tNoOfTables; int e9 = sizeof(MYSQL_STMT*)*tNoOfTables; p21.reset(tmp = (char*)malloc(e1+e2+e3+e4+e5+e6+e7+e8+e9)); mysql_data = (int*)tmp; tmp += e1; bind_insert = (MYSQL_BIND*)tmp; tmp += e2; bind_update = (MYSQL_BIND*)tmp; tmp += e3; bind_read = (MYSQL_BIND*)tmp; tmp += e4; bind_delete = (MYSQL_BIND*)tmp; tmp += e5; prep_insert = (MYSQL_STMT**)tmp; tmp += e6; prep_update = (MYSQL_STMT**)tmp; tmp += e7; prep_read = (MYSQL_STMT**)tmp; tmp += e8; prep_delete = (MYSQL_STMT**)tmp; for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){ MYSQL_BIND& bi = bind_insert[ca]; bi.buffer_type = MYSQL_TYPE_LONG; bi.buffer = (char*)&mysql_data[ca*tAttributeSize]; bi.buffer_length = 0; bi.length = NULL; bi.is_null = NULL; }//for for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){ MYSQL_BIND& bi = bind_update[ca]; bi.buffer_type = MYSQL_TYPE_LONG; if ( ca == tNoOfAttributes-1 ) // the primary key comes last in statement bi.buffer = (char*)&mysql_data[0]; else bi.buffer = (char*)&mysql_data[(ca+1)*tAttributeSize]; bi.buffer_length = 0; bi.length = NULL; bi.is_null = NULL; }//for for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){ MYSQL_BIND& bi = bind_read[ca]; bi.buffer_type = MYSQL_TYPE_LONG; bi.buffer = (char*)&mysql_data[ca*tAttributeSize]; bi.buffer_length = 4; bi.length = NULL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -