⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 flextt.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <NdbApi.hpp>#include <NdbSchemaCon.hpp>#include <NdbMain.h>#include <md5_hash.hpp>#include <NdbThread.h>#include <NdbSleep.h>#include <NdbTick.h>#include <NdbOut.hpp>#include <NdbTimer.hpp>#include <NdbTest.hpp>#include <NDBT_Error.hpp>#define MAX_PARTS 4 #define MAX_SEEK 16 #define MAXSTRLEN 16 #define MAXATTR 64#define MAXTABLES 64#define MAXTHREADS 128#define MAXPAR 1024#define MAXATTRSIZE 1000#define PKSIZE 1#ifdef NDB_WIN32inline long lrand48(void) { return rand(); };#endifenum StartType {   stIdle,   stInsert,  stRead,  stUpdate,  stDelete,   stStop } ;struct ThreadNdb{  int threadNo;  Ndb* threadNdb;  Uint32 threadBase;  Uint32 threadLoopCounter;  Uint32 threadNextStart;  Uint32 threadStop;  Uint32 threadLoopStop;  Uint32 threadIncrement;  Uint32 threadNoCompleted;  bool   threadCompleted;  StartType threadStartType;};struct TransNdb{  char transRecord[128];  Ndb* transNdb;  StartType  transStartType;  Uint32     vpn_number;  Uint32     vpn_identity;  Uint32     transErrorCount;  NdbOperation* transOperation;  ThreadNdb* transThread;};extern "C" { static void* threadLoop(void*); }static void setAttrNames(void);static void setTableNames(void);static int readArguments(int argc, const char** argv);static int createTables(Ndb*);static bool defineOperation(NdbConnection* aTransObject, TransNdb*,                            Uint32 vpn_nb, Uint32 vpn_id);static bool executeTransaction(TransNdb* transNdbRef);static StartType random_choice();static void execute(StartType aType);static bool executeThread(ThreadNdb*, TransNdb*);static void executeCallback(int result, NdbConnection* NdbObject,                            void* aObject);static bool error_handler(const NdbError & err) ;static Uint32 getKey(Uint32, Uint32) ;static void input_error();                                      ErrorData * flexTTErrorData;static NdbThread*                       threadLife[MAXTHREADS];static int                              tNodeId;static int                              ThreadReady[MAXTHREADS];static StartType                        ThreadStart[MAXTHREADS];static char                             tableName[1][MAXSTRLEN+1];static char                             attrName[5][MAXSTRLEN+1];// Program Parametersstatic bool                             tInsert = false;static bool                             tDelete = false;static bool                             tReadUpdate = true;static int                              tUpdateFreq = 20;static bool                             tLocal = false;static int                              tLocalPart = 0;static int                              tMinEvents = 0;static int                              tSendForce = 0;static int                              tNoOfLoops = 1;static Uint32                           tNoOfThreads = 1;static Uint32                           tNoOfParallelTrans = 32;static Uint32                           tNoOfTransactions = 500;static Uint32                           tLoadFactor = 80;static bool                             tempTable = false;static bool                             startTransGuess = true;//Program Flagsstatic int                              theSimpleFlag = 0;static int                              theDirtyFlag = 0;static int                              theWriteFlag = 0;static int                              theTableCreateFlag = 1;#define START_REAL_TIME#define STOP_REAL_TIME#define START_TIMER { NdbTimer timer; timer.doStart();#define STOP_TIMER timer.doStop();#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; static void resetThreads(){  for (int i = 0; i < tNoOfThreads ; i++) {    ThreadReady[i] = 0;    ThreadStart[i] = stIdle;  }//for}static void waitForThreads(void){  int cont = 0;  do {    cont = 0;    NdbSleep_MilliSleep(20);    for (int i = 0; i < tNoOfThreads ; i++) {      if (ThreadReady[i] == 0) {        cont = 1;      }//if    }//for  } while (cont == 1);}static void tellThreads(StartType what){  for (int i = 0; i < tNoOfThreads ; i++)     ThreadStart[i] = what;}static Ndb_cluster_connection *g_cluster_connection= 0;NDB_COMMAND(flexTT, "flexTT", "flexTT", "flexTT", 65535){  ndb_init();  ThreadNdb*            pThreadData;  int                   returnValue = NDBT_OK;  int i;  flexTTErrorData = new ErrorData;  flexTTErrorData->resetErrorCounters();  if (readArguments(argc, argv) != 0){    input_error();    return NDBT_ProgramExit(NDBT_WRONGARGS);  }  pThreadData = new ThreadNdb[MAXTHREADS];  ndbout << endl << "FLEXTT - Starting normal mode" << endl;  ndbout << "Perform TimesTen benchmark" << endl;  ndbout << "  " << tNoOfThreads << " number of concurrent threads " << endl;  ndbout << "  " << tNoOfParallelTrans;  ndbout << " number of parallel transaction per thread " << endl;  ndbout << "  " << tNoOfTransactions << " transaction(s) per round " << endl;  ndbout << "  " << tNoOfLoops << " iterations " << endl;  ndbout << "  " << "Update Frequency is " << tUpdateFreq << "%" << endl;  ndbout << "  " << "Load Factor is " << tLoadFactor << "%" << endl;  if (tLocal == true) {    ndbout << "  " << "We only use Local Part = ";    ndbout << tLocalPart << endl;  }//if  if (tempTable == true) {    ndbout << "  Tables are without logging " << endl;  } else {    ndbout << "  Tables are with logging " << endl;  }//if  if (startTransGuess == true) {    ndbout << "  Transactions are executed with hint provided" << endl;  } else {    ndbout << "  Transactions are executed with round robin scheme" << endl;  }//if  if (tSendForce == 0) {    ndbout << "  No force send is used, adaptive algorithm used" << endl;  } else if (tSendForce == 1) {    ndbout << "  Force send used" << endl;  } else {    ndbout << "  No force send is used, adaptive algorithm disabled" << endl;  }//if  ndbout << endl;  /* print Setting */  flexTTErrorData->printSettings(ndbout);  NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);  setAttrNames();  setTableNames();  Ndb_cluster_connection con;  if(con.connect(12, 5, 1) != 0)  {    return NDBT_ProgramExit(NDBT_FAILED);  }  g_cluster_connection= &con;  Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB");        pNdb->init();  tNodeId = pNdb->getNodeId();  ndbout << "  NdbAPI node with id = " << pNdb->getNodeId() << endl;  ndbout << endl;    ndbout << "Waiting for ndb to become ready..." <<endl;  if (pNdb->waitUntilReady(2000) != 0){    ndbout << "NDB is not ready" << endl;    ndbout << "Benchmark failed!" << endl;    returnValue = NDBT_FAILED;  }  if(returnValue == NDBT_OK){    if (createTables(pNdb) != 0){      returnValue = NDBT_FAILED;    }  }  if(returnValue == NDBT_OK){    /****************************************************************     *  Create NDB objects.                                   *     ****************************************************************/    resetThreads();    for (i = 0; i < tNoOfThreads ; i++) {      pThreadData[i].threadNo = i;      threadLife[i] = NdbThread_Create(threadLoop,                                       (void**)&pThreadData[i],                                       32768,                                       "flexAsynchThread",                                       NDB_THREAD_PRIO_LOW);    }//for    ndbout << endl <<  "All NDB objects and table created" << endl << endl;    int noOfTransacts = tNoOfParallelTrans * tNoOfTransactions *                        tNoOfThreads * tNoOfLoops;    /****************************************************************     * Execute program.                                             *     ****************************************************************/    /****************************************************************     * Perform inserts.                                             *     ****************************************************************/              if (tInsert == true) {      tInsert = false;      tReadUpdate = false;      START_TIMER;      execute(stInsert);      STOP_TIMER;      PRINT_TIMER("insert", noOfTransacts, 1);    }//if    /****************************************************************     * Perform read + updates.                                      *     ****************************************************************/          if (tReadUpdate == true) {      START_TIMER;      execute(stRead);      STOP_TIMER;      PRINT_TIMER("update + read", noOfTransacts, 1);    }//if      /****************************************************************     * Perform delete.                                              *     ****************************************************************/                    if (tDelete == true) {      tDelete = false;      START_TIMER;      execute(stDelete);      STOP_TIMER;      PRINT_TIMER("delete", noOfTransacts, 1);    }//if    ndbout << "--------------------------------------------------" << endl;            execute(stStop);    void * tmp;    for(i = 0; i<tNoOfThreads; i++){      NdbThread_WaitFor(threadLife[i], &tmp);      NdbThread_Destroy(&threadLife[i]);    }  }   delete [] pThreadData;  delete pNdb;  //printing errorCounters  flexTTErrorData->printErrorCounters(ndbout);  return NDBT_ProgramExit(returnValue);}//main()static void execute(StartType aType){  resetThreads();  tellThreads(aType);  waitForThreads();}//execute()static void*threadLoop(void* ThreadData){  Ndb* localNdb;  ThreadNdb* tabThread = (ThreadNdb*)ThreadData;  int loc_threadNo = tabThread->threadNo;  void * mem = malloc(sizeof(TransNdb)*tNoOfParallelTrans);  TransNdb* pTransData = (TransNdb*)mem;  localNdb = new Ndb(g_cluster_connection, "TEST_DB");  localNdb->init(1024);  localNdb->waitUntilReady();  if (tLocal == false) {    tabThread->threadIncrement = 1;  } else {    tabThread->threadIncrement = MAX_SEEK;  }//if  tabThread->threadBase = (loc_threadNo << 16) + tNodeId;  tabThread->threadNdb = localNdb;  tabThread->threadStop = tNoOfParallelTrans * tNoOfTransactions;  tabThread->threadStop *= tabThread->threadIncrement;  tabThread->threadLoopStop = tNoOfLoops;  Uint32 i, j;  for (i = 0; i < tNoOfParallelTrans; i++) {    pTransData[i].transNdb = localNdb;        pTransData[i].transThread = tabThread;        pTransData[i].transOperation = NULL;        pTransData[i].transStartType = stIdle;        pTransData[i].vpn_number = tabThread->threadBase;        pTransData[i].vpn_identity = 0;    pTransData[i].transErrorCount = 0;    for (j = 0; j < 128; j++) {      pTransData[i].transRecord[j] = 0x30;    }//for  }//for  for (;;){    while (ThreadStart[loc_threadNo] == stIdle) {      NdbSleep_MilliSleep(10);    }//while    // Check if signal to exit is received    if (ThreadStart[loc_threadNo] == stStop) {      break;    }//if    tabThread->threadStartType = ThreadStart[loc_threadNo];      tabThread->threadLoopCounter = 0;    tabThread->threadCompleted = false;      tabThread->threadNoCompleted = 0;    tabThread->threadNextStart = 0;    ThreadStart[loc_threadNo] = stIdle;    if(!executeThread(tabThread, pTransData)){      break;    }    ThreadReady[loc_threadNo] = 1;  }//for  free(mem);  delete localNdb;  ThreadReady[loc_threadNo] = 1;  return NULL; // Thread exits}//threadLoop()static boolexecuteThread(ThreadNdb* tabThread, TransNdb* atransDataArrayPtr) {  Uint32 i;  for (i = 0; i < tNoOfParallelTrans; i++) {    TransNdb* transNdbPtr = &atransDataArrayPtr[i];    transNdbPtr->vpn_identity = i * tabThread->threadIncrement;    transNdbPtr->transStartType = tabThread->threadStartType;    if (executeTransaction(transNdbPtr) == false) {      return false;    }//if  }//for  tabThread->threadNextStart = tNoOfParallelTrans * tabThread->threadIncrement;  do {    tabThread->threadNdb->sendPollNdb(3000, tMinEvents, tSendForce);  } while (tabThread->threadCompleted == false);  return true;}//executeThread()staticbool executeTransaction(TransNdb* transNdbRef){  NdbConnection* MyTrans;  ThreadNdb* tabThread = transNdbRef->transThread;  Ndb* aNdbObject = transNdbRef->transNdb;  Uint32 threadBase = tabThread->threadBase;  Uint32 startKey = transNdbRef->vpn_identity;  if (tLocal == true) {    startKey = getKey(startKey, threadBase);  }//if  if (startTransGuess == true) {    Uint32 tKey[2];    tKey[0] = startKey;    tKey[1] = threadBase;    MyTrans = aNdbObject->startTransaction((Uint32)0, //Priority                                         (const char*)&tKey[0],   //Main PKey                                         (Uint32)8);           //Key Length  } else {   MyTrans = aNdbObject->startTransaction();  }//if  if (MyTrans == NULL) {    error_handler(aNdbObject->getNdbError());    ndbout << endl << "Unable to recover! Quiting now" << endl ;    return false;  }//if  //-------------------------------------------------------  // Define the operation, but do not execute it yet.  //-------------------------------------------------------  if (!defineOperation(MyTrans, transNdbRef, startKey, threadBase))    return false;  return true;}//executeTransaction()static Uint32getKey(Uint32 aBase, Uint32 aThreadBase) {  Uint32 Tfound = aBase;  Uint32 hash;  Uint64 Tkey64;  Uint32* tKey32 = (Uint32*)&Tkey64;  tKey32[0] = aThreadBase;  for (int i = aBase; i < (aBase + MAX_SEEK); i++) {    tKey32[1] = (Uint32)i;    hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -