📄 flextt.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <NdbApi.hpp>#include <NdbSchemaCon.hpp>#include <NdbMain.h>#include <md5_hash.hpp>#include <NdbThread.h>#include <NdbSleep.h>#include <NdbTick.h>#include <NdbOut.hpp>#include <NdbTimer.hpp>#include <NdbTest.hpp>#include <NDBT_Error.hpp>#define MAX_PARTS 4 #define MAX_SEEK 16 #define MAXSTRLEN 16 #define MAXATTR 64#define MAXTABLES 64#define MAXTHREADS 128#define MAXPAR 1024#define MAXATTRSIZE 1000#define PKSIZE 1#ifdef NDB_WIN32inline long lrand48(void) { return rand(); };#endifenum StartType { stIdle, stInsert, stRead, stUpdate, stDelete, stStop } ;struct ThreadNdb{ int threadNo; Ndb* threadNdb; Uint32 threadBase; Uint32 threadLoopCounter; Uint32 threadNextStart; Uint32 threadStop; Uint32 threadLoopStop; Uint32 threadIncrement; Uint32 threadNoCompleted; bool threadCompleted; StartType threadStartType;};struct TransNdb{ char transRecord[128]; Ndb* transNdb; StartType transStartType; Uint32 vpn_number; Uint32 vpn_identity; Uint32 transErrorCount; NdbOperation* transOperation; ThreadNdb* transThread;};extern "C" { static void* threadLoop(void*); }static void setAttrNames(void);static void setTableNames(void);static int readArguments(int argc, const char** argv);static int createTables(Ndb*);static bool defineOperation(NdbConnection* aTransObject, TransNdb*, Uint32 vpn_nb, Uint32 vpn_id);static bool executeTransaction(TransNdb* transNdbRef);static StartType random_choice();static void execute(StartType aType);static bool executeThread(ThreadNdb*, TransNdb*);static void executeCallback(int result, NdbConnection* NdbObject, void* aObject);static bool error_handler(const NdbError & err) ;static Uint32 getKey(Uint32, Uint32) ;static void input_error(); ErrorData * flexTTErrorData;static NdbThread* threadLife[MAXTHREADS];static int tNodeId;static int ThreadReady[MAXTHREADS];static StartType ThreadStart[MAXTHREADS];static char tableName[1][MAXSTRLEN+1];static char attrName[5][MAXSTRLEN+1];// Program Parametersstatic bool tInsert = false;static bool tDelete = false;static bool tReadUpdate = true;static int tUpdateFreq = 20;static bool tLocal = false;static int tLocalPart = 0;static int tMinEvents = 0;static int tSendForce = 0;static int tNoOfLoops = 1;static Uint32 tNoOfThreads = 1;static Uint32 tNoOfParallelTrans = 32;static Uint32 tNoOfTransactions = 500;static Uint32 tLoadFactor = 80;static bool tempTable = false;static bool startTransGuess = true;//Program Flagsstatic int theSimpleFlag = 0;static int theDirtyFlag = 0;static int theWriteFlag = 0;static int theTableCreateFlag = 1;#define START_REAL_TIME#define STOP_REAL_TIME#define START_TIMER { NdbTimer timer; timer.doStart();#define STOP_TIMER timer.doStop();#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; static void resetThreads(){ for (int i = 0; i < tNoOfThreads ; i++) { ThreadReady[i] = 0; ThreadStart[i] = stIdle; }//for}static void waitForThreads(void){ int cont = 0; do { cont = 0; NdbSleep_MilliSleep(20); for (int i = 0; i < tNoOfThreads ; i++) { if (ThreadReady[i] == 0) { cont = 1; }//if }//for } while (cont == 1);}static void tellThreads(StartType what){ for (int i = 0; i < tNoOfThreads ; i++) ThreadStart[i] = what;}static Ndb_cluster_connection *g_cluster_connection= 0;NDB_COMMAND(flexTT, "flexTT", "flexTT", "flexTT", 65535){ ndb_init(); ThreadNdb* pThreadData; int returnValue = NDBT_OK; int i; flexTTErrorData = new ErrorData; flexTTErrorData->resetErrorCounters(); if (readArguments(argc, argv) != 0){ input_error(); return NDBT_ProgramExit(NDBT_WRONGARGS); } pThreadData = new ThreadNdb[MAXTHREADS]; ndbout << endl << "FLEXTT - Starting normal mode" << endl; ndbout << "Perform TimesTen benchmark" << endl; ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl; ndbout << " " << tNoOfParallelTrans; ndbout << " number of parallel transaction per thread " << endl; ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl; ndbout << " " << tNoOfLoops << " iterations " << endl; ndbout << " " << "Update Frequency is " << tUpdateFreq << "%" << endl; ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl; if (tLocal == true) { ndbout << " " << "We only use Local Part = "; ndbout << tLocalPart << endl; }//if if (tempTable == true) { ndbout << " Tables are without logging " << endl; } else { ndbout << " Tables are with logging " << endl; }//if if (startTransGuess == true) { ndbout << " Transactions are executed with hint provided" << endl; } else { ndbout << " Transactions are executed with round robin scheme" << endl; }//if if (tSendForce == 0) { ndbout << " No force send is used, adaptive algorithm used" << endl; } else if (tSendForce == 1) { ndbout << " Force send used" << endl; } else { ndbout << " No force send is used, adaptive algorithm disabled" << endl; }//if ndbout << endl; /* print Setting */ flexTTErrorData->printSettings(ndbout); NdbThread_SetConcurrencyLevel(2 + tNoOfThreads); setAttrNames(); setTableNames(); Ndb_cluster_connection con; if(con.connect(12, 5, 1) != 0) { return NDBT_ProgramExit(NDBT_FAILED); } g_cluster_connection= &con; Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB"); pNdb->init(); tNodeId = pNdb->getNodeId(); ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl; ndbout << endl; ndbout << "Waiting for ndb to become ready..." <<endl; if (pNdb->waitUntilReady(2000) != 0){ ndbout << "NDB is not ready" << endl; ndbout << "Benchmark failed!" << endl; returnValue = NDBT_FAILED; } if(returnValue == NDBT_OK){ if (createTables(pNdb) != 0){ returnValue = NDBT_FAILED; } } if(returnValue == NDBT_OK){ /**************************************************************** * Create NDB objects. * ****************************************************************/ resetThreads(); for (i = 0; i < tNoOfThreads ; i++) { pThreadData[i].threadNo = i; threadLife[i] = NdbThread_Create(threadLoop, (void**)&pThreadData[i], 32768, "flexAsynchThread", NDB_THREAD_PRIO_LOW); }//for ndbout << endl << "All NDB objects and table created" << endl << endl; int noOfTransacts = tNoOfParallelTrans * tNoOfTransactions * tNoOfThreads * tNoOfLoops; /**************************************************************** * Execute program. * ****************************************************************/ /**************************************************************** * Perform inserts. * ****************************************************************/ if (tInsert == true) { tInsert = false; tReadUpdate = false; START_TIMER; execute(stInsert); STOP_TIMER; PRINT_TIMER("insert", noOfTransacts, 1); }//if /**************************************************************** * Perform read + updates. * ****************************************************************/ if (tReadUpdate == true) { START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("update + read", noOfTransacts, 1); }//if /**************************************************************** * Perform delete. * ****************************************************************/ if (tDelete == true) { tDelete = false; START_TIMER; execute(stDelete); STOP_TIMER; PRINT_TIMER("delete", noOfTransacts, 1); }//if ndbout << "--------------------------------------------------" << endl; execute(stStop); void * tmp; for(i = 0; i<tNoOfThreads; i++){ NdbThread_WaitFor(threadLife[i], &tmp); NdbThread_Destroy(&threadLife[i]); } } delete [] pThreadData; delete pNdb; //printing errorCounters flexTTErrorData->printErrorCounters(ndbout); return NDBT_ProgramExit(returnValue);}//main()static void execute(StartType aType){ resetThreads(); tellThreads(aType); waitForThreads();}//execute()static void*threadLoop(void* ThreadData){ Ndb* localNdb; ThreadNdb* tabThread = (ThreadNdb*)ThreadData; int loc_threadNo = tabThread->threadNo; void * mem = malloc(sizeof(TransNdb)*tNoOfParallelTrans); TransNdb* pTransData = (TransNdb*)mem; localNdb = new Ndb(g_cluster_connection, "TEST_DB"); localNdb->init(1024); localNdb->waitUntilReady(); if (tLocal == false) { tabThread->threadIncrement = 1; } else { tabThread->threadIncrement = MAX_SEEK; }//if tabThread->threadBase = (loc_threadNo << 16) + tNodeId; tabThread->threadNdb = localNdb; tabThread->threadStop = tNoOfParallelTrans * tNoOfTransactions; tabThread->threadStop *= tabThread->threadIncrement; tabThread->threadLoopStop = tNoOfLoops; Uint32 i, j; for (i = 0; i < tNoOfParallelTrans; i++) { pTransData[i].transNdb = localNdb; pTransData[i].transThread = tabThread; pTransData[i].transOperation = NULL; pTransData[i].transStartType = stIdle; pTransData[i].vpn_number = tabThread->threadBase; pTransData[i].vpn_identity = 0; pTransData[i].transErrorCount = 0; for (j = 0; j < 128; j++) { pTransData[i].transRecord[j] = 0x30; }//for }//for for (;;){ while (ThreadStart[loc_threadNo] == stIdle) { NdbSleep_MilliSleep(10); }//while // Check if signal to exit is received if (ThreadStart[loc_threadNo] == stStop) { break; }//if tabThread->threadStartType = ThreadStart[loc_threadNo]; tabThread->threadLoopCounter = 0; tabThread->threadCompleted = false; tabThread->threadNoCompleted = 0; tabThread->threadNextStart = 0; ThreadStart[loc_threadNo] = stIdle; if(!executeThread(tabThread, pTransData)){ break; } ThreadReady[loc_threadNo] = 1; }//for free(mem); delete localNdb; ThreadReady[loc_threadNo] = 1; return NULL; // Thread exits}//threadLoop()static boolexecuteThread(ThreadNdb* tabThread, TransNdb* atransDataArrayPtr) { Uint32 i; for (i = 0; i < tNoOfParallelTrans; i++) { TransNdb* transNdbPtr = &atransDataArrayPtr[i]; transNdbPtr->vpn_identity = i * tabThread->threadIncrement; transNdbPtr->transStartType = tabThread->threadStartType; if (executeTransaction(transNdbPtr) == false) { return false; }//if }//for tabThread->threadNextStart = tNoOfParallelTrans * tabThread->threadIncrement; do { tabThread->threadNdb->sendPollNdb(3000, tMinEvents, tSendForce); } while (tabThread->threadCompleted == false); return true;}//executeThread()staticbool executeTransaction(TransNdb* transNdbRef){ NdbConnection* MyTrans; ThreadNdb* tabThread = transNdbRef->transThread; Ndb* aNdbObject = transNdbRef->transNdb; Uint32 threadBase = tabThread->threadBase; Uint32 startKey = transNdbRef->vpn_identity; if (tLocal == true) { startKey = getKey(startKey, threadBase); }//if if (startTransGuess == true) { Uint32 tKey[2]; tKey[0] = startKey; tKey[1] = threadBase; MyTrans = aNdbObject->startTransaction((Uint32)0, //Priority (const char*)&tKey[0], //Main PKey (Uint32)8); //Key Length } else { MyTrans = aNdbObject->startTransaction(); }//if if (MyTrans == NULL) { error_handler(aNdbObject->getNdbError()); ndbout << endl << "Unable to recover! Quiting now" << endl ; return false; }//if //------------------------------------------------------- // Define the operation, but do not execute it yet. //------------------------------------------------------- if (!defineOperation(MyTrans, transNdbRef, startKey, threadBase)) return false; return true;}//executeTransaction()static Uint32getKey(Uint32 aBase, Uint32 aThreadBase) { Uint32 Tfound = aBase; Uint32 hash; Uint64 Tkey64; Uint32* tKey32 = (Uint32*)&Tkey64; tKey32[0] = aThreadBase; for (int i = aBase; i < (aBase + MAX_SEEK); i++) { tKey32[1] = (Uint32)i; hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -