flextimedasynch.cpp
来自「MySQL数据库开发源码 值得一看哦」· C++ 代码 · 共 853 行 · 第 1/2 页
CPP
853 行
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//* *************************************************** FLEXTIMEDASYNCH Perform benchmark of insert, update and delete transactions. Arguments: -t Number of threads to start, i.e., number of parallel loops, default 1 -p Number of transactions in a batch, default 32 -o Number of batches per loop, default 200 -i Time between batch starts, default 0 -l Number of loops to run, default 1, 0=infinite -a Number of attributes, default 25 -c Number of operations per transaction -s Size of each attribute in 32 bit word, default 1 (Primary Key is always of size 1, independent of this value) -simple Use simple read to read from database -dirty Use dirty read to read from database -write Use writeTuple in insert and update -n Use standard table names -no_table_create Don't create tables in db -temp Use temporary tables, no writing to disk. Returns: 0 - Test passed -1 - Test failed 1 - Invalid arguments * *************************************************** */#include "NdbApi.hpp"#include <NdbThread.h>#include <NdbSleep.h>#include <NdbTick.h>#include <NdbOut.hpp>#include <NdbTimer.hpp>#include <string.h>#include <NdbMain.h>#include <NdbTest.hpp>#include <NDBT_Error.hpp>#define MAXSTRLEN 16 #define MAXATTR 64#define MAXTABLES 64#define MAXTHREADS 256#define MAXATTRSIZE 1000#define PKSIZE 1enum StartType { stIdle, stInsert, stRead, stUpdate, stDelete, stStop } ;ErrorData * flexTimedAsynchErrorData; struct ThreadNdb{ int NoOfOps; int ThreadNo; unsigned int threadBase; unsigned int transactionCompleted;};extern "C" void* threadLoop(void*);void setAttrNames(void);void setTableNames(void);void readArguments(int argc, const char** argv);void createAttributeSpace();void createTables(Ndb*);void defineOperation(NdbConnection* aTransObject, StartType aType, unsigned int key, int *);void execute(StartType aType);void executeThread(StartType aType, Ndb* aNdbObject, ThreadNdb* threadInfo);void executeCallback(int result, NdbConnection* NdbObject, void* aObject);/* epaulsa > *************************************************************/bool error_handler(const NdbError &) ; //replaces 'goto' thingsstatic int failed = 0 ; // lame global variable that keeps track of failed transactions // incremented in executeCallback() and reset in main()/************************************************************* < epaulsa */static NdbThread* threadLife[MAXTHREADS];static int tNodeId;static int ThreadReady[MAXTHREADS];static StartType ThreadStart[MAXTHREADS];static char tableName[MAXTABLES][MAXSTRLEN+1];static char attrName[MAXATTR][MAXSTRLEN+1];static int *getAttrValueTable;// Program Parametersstatic int tNoOfLoops = 1;static int tAttributeSize = 1;static unsigned int tNoOfThreads = 1;static unsigned int tNoOfTransInBatch = 32;static unsigned int tNoOfAttributes = 25;static unsigned int tNoOfBatchesInLoop = 200;static unsigned int tNoOfOpsPerTrans = 1;static unsigned int tTimeBetweenBatches = 0;//Program Flagsstatic int theTestFlag = 0;static int theTempFlag = 1;static int theSimpleFlag = 0;static int theDirtyFlag = 0;static int theWriteFlag = 0;static int theStdTableNameFlag = 0;static int theTableCreateFlag = 0;#define START_REAL_TIME NdbTimer timer; timer.doStart();#define STOP_REAL_TIME timer.doStop(); #define START_TIMER { NdbTimer timer; timer.doStart();#define STOP_TIMER timer.doStop();#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; void resetThreads(){ for (int i = 0; i < tNoOfThreads ; i++) { ThreadReady[i] = 0; ThreadStart[i] = stIdle; }}void waitForThreads(void){ int cont; do { cont = 0; NdbSleep_MilliSleep(20); for (int i = 0; i < tNoOfThreads ; i++) { if (ThreadReady[i] == 0) { cont = 1; } } } while (cont == 1);}void tellThreads(StartType what){ for (int i = 0; i < tNoOfThreads ; i++) ThreadStart[i] = what;}void createAttributeSpace(){ getAttrValueTable = new int[tAttributeSize* tNoOfThreads * tNoOfAttributes ];}void deleteAttributeSpace(){ delete [] getAttrValueTable;}NDB_COMMAND(flexTimedAsynch, "flexTimedAsynch", "flexTimedAsynch [-tpoilcas]", "flexTimedAsynch", 65535){ ndb_init(); ThreadNdb tabThread[MAXTHREADS]; int tLoops=0; int returnValue; //NdbOut flexTimedAsynchNdbOut; flexTimedAsynchErrorData = new ErrorData; flexTimedAsynchErrorData->resetErrorCounters(); Ndb* pNdb; pNdb = new Ndb( "TEST_DB" ); pNdb->init(); readArguments(argc, argv); createAttributeSpace(); ndbout << endl << "FLEXTIMEDASYNCH - Starting normal mode" << endl; ndbout << "Perform benchmark of insert, update and delete transactions" << endl << endl; if(theTempFlag == 0) ndbout << " " << "Using temporary tables. " << endl; // -t, tNoOfThreads ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl; // -c, tNoOfOpsPerTrans ndbout << " " << tNoOfOpsPerTrans << " operations per transaction " << endl; // -p, tNoOfTransInBatch ndbout << " " << tNoOfTransInBatch << " number of transactions in a batch per thread " << endl; // -o, tNoOfBatchesInLoop ndbout << " " << tNoOfBatchesInLoop << " number of batches per loop " << endl; // -i, tTimeBetweenBatches ndbout << " " << tTimeBetweenBatches << " milli seconds at least between batch starts " << endl; // -l, tNoOfLoops ndbout << " " << tNoOfLoops << " loops " << endl; // -a, tNoOfAttributes ndbout << " " << tNoOfAttributes << " attributes per table " << endl; // -s, tAttributeSize ndbout << " " << tAttributeSize << " is the number of 32 bit words per attribute " << endl << endl; NdbThread_SetConcurrencyLevel(2 + tNoOfThreads); /* print Setting */ flexTimedAsynchErrorData->printSettings(ndbout); setAttrNames(); setTableNames(); ndbout << "Waiting for ndb to become ready..." <<endl; if (pNdb->waitUntilReady() == 0) { tNodeId = pNdb->getNodeId(); ndbout << " NdbAPI node with id = " << tNodeId << endl; createTables(pNdb); /**************************************************************** * Create NDB objects. * ****************************************************************/ resetThreads(); for (int i = 0; i < tNoOfThreads ; i++) { tabThread[i].ThreadNo = i; threadLife[i] = NdbThread_Create(threadLoop, (void**)&tabThread[i], 32768, "flexTimedAsynchThread", NDB_THREAD_PRIO_LOW); } ndbout << endl << "All NDB objects and table created" << endl << endl; int noOfTransacts = tNoOfTransInBatch*tNoOfBatchesInLoop*tNoOfThreads; /**************************************************************** * Execute program. * ****************************************************************/ for(;;) { int loopCount = tLoops + 1 ; ndbout << endl << "Loop # " << loopCount << endl << endl ; /**************************************************************** * Perform inserts. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stInsert); STOP_TIMER; PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { ndbout << failed << " of the transactions returned errors!, moving on now..."<<endl ; } /**************************************************************** * Perform read. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { ndbout << failed << " of the transactions returned errors!, moving on now..."<<endl ; } /**************************************************************** * Perform update. * ***************************************************************/ failed = 0 ; START_TIMER; execute(stUpdate); STOP_TIMER; PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ; if (0 < failed) { ndbout << failed << " of the transactions returned errors!, moving on now..."<<endl ; } /**************************************************************** * Perform read after update. ****************************************************************/ failed = 0 ; START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { ndbout << failed << " of the transactions returned errors!, moving on now..."<<endl ; } /**************************************************************** * Perform delete. * ****************************************************************/ failed = 0; START_TIMER; execute(stDelete); STOP_TIMER; PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { ndbout << failed << " of the transactions returned errors!, moving on now..."<<endl ; } tLoops++; ndbout << "--------------------------------------------------" << endl; if(tNoOfLoops != 0){ if(tNoOfLoops <= tLoops) break ; } } ndbout << endl << "Benchmark completed!" << endl; returnValue = NDBT_OK; execute(stStop); void * tmp; for(int i = 0; i<tNoOfThreads; i++){ NdbThread_WaitFor(threadLife[i], &tmp); NdbThread_Destroy(&threadLife[i]); } } else { ndbout << "NDB is not ready" << endl; ndbout << "Benchmark failed!" << endl; returnValue = NDBT_FAILED; } deleteAttributeSpace(); delete pNdb; //printing errorCounters flexTimedAsynchErrorData->printErrorCounters(ndbout); return NDBT_ProgramExit(returnValue);}//main()////////////////////////////////////////void execute(StartType aType){ resetThreads(); tellThreads(aType); waitForThreads();}void*threadLoop(void* ThreadData){ // Do work until signaled to stop. Ndb* localNdb; StartType tType; ThreadNdb* threadInfo = (ThreadNdb*)ThreadData; int threadNo = threadInfo->ThreadNo; localNdb = new Ndb("TEST_DB"); localNdb->init(512); localNdb->waitUntilReady(); threadInfo->threadBase = (threadNo * 2000000) + (tNodeId * 260000000); for (;;) { while (ThreadStart[threadNo] == stIdle) { NdbSleep_MilliSleep(10); } // Check if signal to exit is received if (ThreadStart[threadNo] == stStop) { break; } tType = ThreadStart[threadNo]; ThreadStart[threadNo] = stIdle; executeThread(tType, localNdb, threadInfo); ThreadReady[threadNo] = 1; } delete localNdb; ThreadReady[threadNo] = 1; return NULL; // thread exits}void executeThread(StartType aType, Ndb* aNdbObject, ThreadNdb* threadInfo){ // Do all batch job in loop with start specified delay int i, j, k; NdbConnection* tConArray[1024]; unsigned int tBase; unsigned int tBase2; int threadId = threadInfo->ThreadNo; int *getValueRowAddress = NULL; NdbTimer timer; timer.doStart(); for (i = 0; i < tNoOfBatchesInLoop; i++) { //tBase = threadBase + (i * tNoOfTransInBatch * tNoOfOpsPerTrans);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?