📄 msa.cpp
字号:
} NDB_TICKS tEndCall = NdbTick_CurrentMillisecond(); long lMillisecForThisCall = (long)(tEndCall-tStartCall); if(g_bReport) { assert(lMillisecForThisCall>=0 && lMillisecForThisCall<c_nMaxMillisecForAllCall); InterlockedIncrement(g_plCountMillisecForCall+lMillisecForThisCall); } if(bTimeLatency) { NdbMutex_Lock(g_pNdbMutexPrintf); printf("Total time for call is %ld msec.\n", (long)lMillisecForThisCall); NdbMutex_Unlock(g_pNdbMutexPrintf); } nNumCallsProcessed++; InterlockedIncrementAndReport(); if(g_nMaxCallsPerSecond>0) { int iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond; iMillisecToSleep -= lMillisecForThisCall; if(iMillisecToSleep>0) { NdbSleep_MilliSleep(iMillisecToSleep); } } } NdbMutex_Lock(g_pNdbMutexPrintf); printf("Terminating thread after %ld calls\n", nNumCallsProcessed); NdbMutex_Unlock(g_pNdbMutexPrintf); delete pNdb; delete[] pchContextData; return 0;}int CreateCallContextTable(Ndb* pNdb, const char* szTableName, bool bStored){ int iRes = -1; NdbError err; memset(&err, 0, sizeof(err)); NdbSchemaCon* pNdbSchemaCon = NdbSchemaCon::startSchemaTrans(pNdb); if(pNdbSchemaCon) { NdbSchemaOp* pNdbSchemaOp = pNdbSchemaCon->getNdbSchemaOp(); if(pNdbSchemaOp) { if(!pNdbSchemaOp->createTable(szTableName, 8, TupleKey, 2, All, 6, 78, 80, 1, bStored) && !pNdbSchemaOp->createAttribute(c_szContextId, TupleKey, 32, 1, Signed) && !pNdbSchemaOp->createAttribute(c_szVersion, NoKey, 32, 1, Signed) && !pNdbSchemaOp->createAttribute(c_szLockFlag, NoKey, 32, 1, Signed) && !pNdbSchemaOp->createAttribute(c_szLockTime, NoKey, 32, 1, Signed) && !pNdbSchemaOp->createAttribute(c_szLockTimeUSec, NoKey, 32, 1, Signed) && !pNdbSchemaOp->createAttribute(c_szContextData, NoKey, 8, g_nStatusDataSize, String)) { if(!pNdbSchemaCon->execute()) iRes = 0; else err = pNdbSchemaCon->getNdbError(); } else err = pNdbSchemaOp->getNdbError(); } else err = pNdbSchemaCon->getNdbError(); NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon); } else err = pNdb->getNdbError(); if(iRes) { ReportNdbError("create call context table failed", err); } return iRes;}void ReportResponseTimeStatistics(const char* szStat, long* plCount, const long lSize){ long lCount = 0; Int64 llSum = 0; Int64 llSum2 = 0; long lMin = -1; long lMax = -1; for(long l=0; l<lSize; ++l) { if(plCount[l]>0) { lCount += plCount[l]; llSum += (Int64)l*(Int64)plCount[l]; llSum2 += (Int64)l*(Int64)l*(Int64)plCount[l]; if(lMin==-1 || l<lMin) { lMin = l; } if(lMax==-1 || l>lMax) { lMax = l; } } } long lAvg = long(llSum/lCount); double dblVar = ((double)lCount*(double)llSum2 - (double)llSum*(double)llSum)/((double)lCount*(double)(lCount-1)); long lStd = long(sqrt(dblVar)); long lMed = -1; long l95 = -1; long lSel = -1; for(long l=lMin; l<=lMax; ++l) { if(plCount[l]>0) { lSel += plCount[l]; if(lMed==-1 && lSel>=(lCount/2)) { lMed = l; } if(l95==-1 && lSel>=((lCount*95)/100)) { l95 = l; } if(g_bReportPlus) { printf("%ld\t%ld\n", l, plCount[l]); } } } printf("%s: Count=%ld, Min=%ld, Max=%ld, Avg=%ld, Std=%ld, Med=%ld, 95%%=%ld\n", szStat, lCount, lMin, lMax, lAvg, lStd, lMed, l95);}void ShowHelp(const char* szCmd){ printf("%s -t<threads> [-s<seed>] [-b<batch>] [-c<maxcps>] [-m<size>] [-d] [-i] [-v] [-f] [-w] [-r[+]]\n", szCmd); printf("%s -?\n", szCmd); puts("-d\t\tcreate the table"); puts("-i\t\tinsert initial records"); puts("-v\t\tverify initial records"); puts("-t<threads>\tnumber of threads making calls"); puts("-s<seed>\toffset for primary key"); puts("-b<batch>\tbatch size per thread"); puts("-c<maxcps>\tmax number of calls per second for this process"); puts("-m<size>\tsize of context data"); puts("-f\t\tno checkpointing and no logging"); puts("-w\t\tuse writeTuple instead of insertTuple"); puts("-r\t\treport response time statistics"); puts("-r+\t\treport response time distribution"); puts("-?\t\thelp");}int main(int argc, char* argv[]){ ndb_init(); int iRes = -1; g_nNumThreads = 0; g_nMaxCallsPerSecond = 0; long nSeed = 0; bool bStoredTable = true; bool bCreateTable = false; g_bWriteTuple = false; g_bReport = false; g_bReportPlus = false; for(int i=1; i<argc; ++i) { if(argv[i][0]=='-' || argv[i][0]=='/') { switch(argv[i][1]) { case 't': g_nNumThreads = atol(argv[i]+2); break; case 's': nSeed = atol(argv[i]+2); break; case 'b': g_nMaxContextIdPerThread = atol(argv[i]+2); break; case 'm': g_nStatusDataSize = atol(argv[i]+2); if(g_nStatusDataSize>sizeof(STATUS_DATA)) { g_nStatusDataSize = sizeof(STATUS_DATA); } break; case 'i': g_bInsertInitial = true; break; case 'v': g_bVerifyInitial = true; break; case 'd': bCreateTable = true; break; case 'f': bStoredTable = false; break; case 'w': g_bWriteTuple = true; break; case 'r': g_bReport = true; if(argv[i][2]=='+') { g_bReportPlus = true; } break; case 'c': g_nMaxCallsPerSecond = atol(argv[i]+2); break; case '?': default: ShowHelp(argv[0]); return -1; } } else { ShowHelp(argv[0]); return -1; } } if(bCreateTable) puts("-d\tcreate the table"); if(g_bInsertInitial) printf("-i\tinsert initial records\n"); if(g_bVerifyInitial) printf("-v\tverify initial records\n"); if(g_nNumThreads>0) printf("-t%ld\tnumber of threads making calls\n", g_nNumThreads); if(g_nNumThreads>0) { printf("-s%ld\toffset for primary key\n", nSeed); printf("-b%ld\tbatch size per thread\n", g_nMaxContextIdPerThread); } if(g_nMaxCallsPerSecond>0) printf("-c%ld\tmax number of calls per second for this process\n", g_nMaxCallsPerSecond); if(!bStoredTable) puts("-f\tno checkpointing and no logging to disk"); if(g_bWriteTuple) puts("-w\tuse writeTuple instead of insertTuple"); if(g_bReport) puts("-r\treport response time statistics"); if(g_bReportPlus) puts("-r+\treport response time distribution"); if(!bCreateTable && g_nNumThreads<=0) { ShowHelp(argv[0]); return -1; } printf("-m%ld\tsize of context data\n", g_nStatusDataSize); g_szTableName = (bStoredTable ? c_szTableNameStored : c_szTableNameTemp); #ifdef NDB_WIN32 SetConsoleCtrlHandler(ConsoleCtrlHandler, true); #else signal(SIGINT, CtrlCHandler);#endif if(g_bReport) { g_plCountMillisecForCall = new long[c_nMaxMillisecForAllCall]; memset(g_plCountMillisecForCall, 0, c_nMaxMillisecForAllCall*sizeof(long)); g_plCountMillisecForTrans = new long[c_nMaxMillisecForAllTrans]; memset(g_plCountMillisecForTrans, 0, c_nMaxMillisecForAllTrans*sizeof(long)); } g_pNdbMutexIncrement = NdbMutex_Create(); g_pNdbMutexPrintf = NdbMutex_Create();#ifdef NDB_WIN32 hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);#endif Ndb* pNdb = new Ndb(c_szDatabaseName); if(!pNdb) { printf("could not construct ndb\n"); return 1; } if(pNdb->init(1) || pNdb->waitUntilReady()) { ReportNdbError("could not initialize ndb\n", pNdb->getNdbError()); delete pNdb; return 2; } if(bCreateTable) { printf("Create CallContext table\n"); if (bStoredTable) { if (CreateCallContextTable(pNdb, c_szTableNameStored, true)) { printf("Create table failed\n"); delete pNdb; return 3; } } else { if (CreateCallContextTable(pNdb, c_szTableNameTemp, false)) { printf("Create table failed\n"); delete pNdb; return 3; } } } if(g_nNumThreads>0) { printf("creating %d threads\n", (int)g_nNumThreads); if(g_bInsertInitial) { printf("each thread will insert %ld initial records, total %ld inserts\n", g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread); } if(g_bVerifyInitial) { printf("each thread will verify %ld initial records, total %ld reads\n", g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread); } g_nNumberOfInitialInsert = 0; g_nNumberOfInitialVerify = 0; NDB_TICKS tStartTime = NdbTick_CurrentMillisecond(); NdbThread* pThreads[256]; int pnStartingRecordNum[256]; int ij; for(ij=0;ij<g_nNumThreads;ij++) { pnStartingRecordNum[ij] = (ij*g_nMaxContextIdPerThread) + nSeed; } for(ij=0;ij<g_nNumThreads;ij++) { pThreads[ij] = NdbThread_Create(RuntimeCallContext, (void**)(pnStartingRecordNum+ij), 0, "RuntimeCallContext", NDB_THREAD_PRIO_LOW); } //Wait for the threads to finish for(ij=0;ij<g_nNumThreads;ij++) { void* status; NdbThread_WaitFor(pThreads[ij], &status); } NDB_TICKS tEndTime = NdbTick_CurrentMillisecond(); //Print time taken printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n", g_nNumCallsProcessed, (long)(tEndTime-tStartTime), (long)((1000*g_nNumCallsProcessed)/(tEndTime-tStartTime))); if(g_bInsertInitial) printf("successfully inserted %ld tuples\n", g_nNumberOfInitialInsert); if(g_bVerifyInitial) printf("successfully verified %ld tuples\n", g_nNumberOfInitialVerify); } delete pNdb;#ifdef NDB_WIN32 CloseHandle(hShutdownEvent);#endif NdbMutex_Destroy(g_pNdbMutexIncrement); NdbMutex_Destroy(g_pNdbMutexPrintf); if(g_bReport) { ReportResponseTimeStatistics("Calls", g_plCountMillisecForCall, c_nMaxMillisecForAllCall); ReportResponseTimeStatistics("Transactions", g_plCountMillisecForTrans, c_nMaxMillisecForAllTrans); delete[] g_plCountMillisecForCall; delete[] g_plCountMillisecForTrans; } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -