📄 flexasynch.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include "NdbApi.hpp"#include <NdbSchemaCon.hpp>#include <NdbMain.h>#include <md5_hash.hpp>#include <NdbThread.h>#include <NdbSleep.h>#include <NdbTick.h>#include <NdbOut.hpp>#include <NdbTimer.hpp>#include <NDBT_Error.hpp>#include <NdbTest.hpp>#define MAX_PARTS 4 #define MAX_SEEK 16 #define MAXSTRLEN 16 #define MAXATTR 64#define MAXTABLES 64#define MAXTHREADS 128#define MAXPAR 1024#define MAXATTRSIZE 1000#define PKSIZE 2enum StartType { stIdle, stInsert, stRead, stUpdate, stDelete, stStop } ;extern "C" { static void* threadLoop(void*); }static void setAttrNames(void);static void setTableNames(void);static int readArguments(int argc, const char** argv);static int createTables(Ndb*);static void defineOperation(NdbConnection* aTransObject, StartType aType, Uint32 base, Uint32 aIndex);static void execute(StartType aType);static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);static void executeCallback(int result, NdbConnection* NdbObject, void* aObject);static bool error_handler(const NdbError & err);static Uint32 getKey(Uint32, Uint32) ;static void input_error();static int retry_opt = 3 ;static int failed = 0 ; ErrorData * flexAsynchErrorData; struct ThreadNdb{ int NoOfOps; int ThreadNo;};static NdbThread* threadLife[MAXTHREADS];static int tNodeId;static int ThreadReady[MAXTHREADS];static StartType ThreadStart[MAXTHREADS];static char tableName[MAXTABLES][MAXSTRLEN+1];static char attrName[MAXATTR][MAXSTRLEN+1];// Program Parametersstatic bool tLocal = false;static int tLocalPart = 0;static int tSendForce = 0;static int tNoOfLoops = 1;static int tAttributeSize = 1;static unsigned int tNoOfThreads = 1;static unsigned int tNoOfParallelTrans = 32;static unsigned int tNoOfAttributes = 25;static unsigned int tNoOfTransactions = 500;static unsigned int tNoOfOpsPerTrans = 1;static unsigned int tLoadFactor = 80;static bool tempTable = false;static bool startTransGuess = true;//Program Flagsstatic int theTestFlag = 0;static int theSimpleFlag = 0;static int theDirtyFlag = 0;static int theWriteFlag = 0;static int theStdTableNameFlag = 0;static int theTableCreateFlag = 0;#define START_REAL_TIME#define STOP_REAL_TIME#define START_TIMER { NdbTimer timer; timer.doStart();#define STOP_TIMER timer.doStop();#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; static void resetThreads(){ for (int i = 0; i < tNoOfThreads ; i++) { ThreadReady[i] = 0; ThreadStart[i] = stIdle; }//for}static void waitForThreads(void){ int cont = 0; do { cont = 0; NdbSleep_MilliSleep(20); for (int i = 0; i < tNoOfThreads ; i++) { if (ThreadReady[i] == 0) { cont = 1; }//if }//for } while (cont == 1);}static void tellThreads(StartType what){ for (int i = 0; i < tNoOfThreads ; i++) ThreadStart[i] = what;}static Ndb_cluster_connection *g_cluster_connection= 0;NDB_COMMAND(flexAsynch, "flexAsynch", "flexAsynch", "flexAsynch", 65535){ ndb_init(); ThreadNdb* pThreadData; int tLoops=0, i; int returnValue = NDBT_OK; flexAsynchErrorData = new ErrorData; flexAsynchErrorData->resetErrorCounters(); if (readArguments(argc, argv) != 0){ input_error(); return NDBT_ProgramExit(NDBT_WRONGARGS); } pThreadData = new ThreadNdb[MAXTHREADS]; ndbout << endl << "FLEXASYNCH - Starting normal mode" << endl; ndbout << "Perform benchmark of insert, update and delete transactions"; ndbout << endl; ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl; ndbout << " " << tNoOfParallelTrans; ndbout << " number of parallel operation per thread " << endl; ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl; ndbout << " " << tNoOfLoops << " iterations " << endl; ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl; ndbout << " " << tNoOfAttributes << " attributes per table " << endl; ndbout << " " << tAttributeSize; ndbout << " is the number of 32 bit words per attribute " << endl; if (tempTable == true) { ndbout << " Tables are without logging " << endl; } else { ndbout << " Tables are with logging " << endl; }//if if (startTransGuess == true) { ndbout << " Transactions are executed with hint provided" << endl; } else { ndbout << " Transactions are executed with round robin scheme" << endl; }//if if (tSendForce == 0) { ndbout << " No force send is used, adaptive algorithm used" << endl; } else if (tSendForce == 1) { ndbout << " Force send used" << endl; } else { ndbout << " No force send is used, adaptive algorithm disabled" << endl; }//if ndbout << endl; NdbThread_SetConcurrencyLevel(2 + tNoOfThreads); /* print Setting */ flexAsynchErrorData->printSettings(ndbout); setAttrNames(); setTableNames(); Ndb_cluster_connection con; if(con.connect(12, 5, 1) != 0) { return NDBT_ProgramExit(NDBT_FAILED); } g_cluster_connection= &con; Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB"); pNdb->init(); tNodeId = pNdb->getNodeId(); ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl; ndbout << endl; ndbout << "Waiting for ndb to become ready..." <<endl; if (pNdb->waitUntilReady(10000) != 0){ ndbout << "NDB is not ready" << endl; ndbout << "Benchmark failed!" << endl; returnValue = NDBT_FAILED; } if(returnValue == NDBT_OK){ if (createTables(pNdb) != 0){ returnValue = NDBT_FAILED; } } if(returnValue == NDBT_OK){ /**************************************************************** * Create NDB objects. * ****************************************************************/ resetThreads(); for (i = 0; i < tNoOfThreads ; i++) { pThreadData[i].ThreadNo = i; threadLife[i] = NdbThread_Create(threadLoop, (void**)&pThreadData[i], 32768, "flexAsynchThread", NDB_THREAD_PRIO_LOW); }//for ndbout << endl << "All NDB objects and table created" << endl << endl; int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads; /**************************************************************** * Execute program. * ****************************************************************/ for(;;) { int loopCount = tLoops + 1 ; ndbout << endl << "Loop # " << loopCount << endl << endl ; /**************************************************************** * Perform inserts. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stInsert); STOP_TIMER; PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { i = retry_opt ; int ci = 1 ; while (0 < failed && 0 < i){ ndbout << failed << " of the transactions returned errors!" << endl << endl; ndbout << "Attempting to redo the failed transactions now..." << endl ; ndbout << "Redo attempt " << ci <<" out of " << retry_opt << endl << endl; failed = 0 ; START_TIMER; execute(stInsert); STOP_TIMER; PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); i-- ; ci++; } if(0 == failed ){ ndbout << endl <<"Redo attempt succeeded" << endl << endl; }else{ ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl; }//if }//if /**************************************************************** * Perform read. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { i = retry_opt ; int cr = 1; while (0 < failed && 0 < i){ ndbout << failed << " of the transactions returned errors!"<<endl ; ndbout << endl; ndbout <<"Attempting to redo the failed transactions now..." << endl; ndbout << endl; ndbout <<"Redo attempt " << cr <<" out of "; ndbout << retry_opt << endl << endl; failed = 0 ; START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); i-- ; cr++ ; }//while if(0 == failed ) { ndbout << endl <<"Redo attempt succeeded" << endl << endl ; }else{ ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ; }//if }//if /****************************************************************
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -