📄 flextt.cpp
字号:
hash = (hash >> 6) & (MAX_PARTS - 1); if (hash == tLocalPart) { Tfound = i; break; }//if }//for return Tfound;}//getKey()static voidexecuteCallback(int result, NdbConnection* NdbObject, void* aObject){ TransNdb* transNdbRef = (TransNdb*)aObject; ThreadNdb* tabThread = transNdbRef->transThread; Ndb* tNdb = transNdbRef->transNdb; Uint32 vpn_id = transNdbRef->vpn_identity; Uint32 vpn_nb = tabThread->threadBase; if (result == -1) {// Add complete error handling here int retCode = flexTTErrorData->handleErrorCommon(NdbObject->getNdbError()); if (retCode == 1) { if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630) { ndbout_c("execute: %s", NdbObject->getNdbError().message); ndbout_c("Error code = %d", NdbObject->getNdbError().code); } } else if (retCode == 2) { ndbout << "4115 should not happen in flexTT" << endl; } else if (retCode == 3) { /* What can we do here? */ ndbout_c("execute: %s", NdbObject->getNdbError().message); }//if(retCode == 3) transNdbRef->transErrorCount++; const NdbError & err = NdbObject->getNdbError(); switch (err.classification) { case NdbError::NoDataFound: case NdbError::ConstraintViolation: ndbout << "Error with vpn_id = " << vpn_id << " and vpn_nb = "; ndbout << vpn_nb << endl; ndbout << err << endl; goto checkCompleted; case NdbError::OverloadError: NdbSleep_MilliSleep(10); case NdbError::NodeRecoveryError: case NdbError::UnknownResultError: case NdbError::TimeoutExpired: break; default: goto checkCompleted; }//if if ((transNdbRef->transErrorCount > 10) || (tabThread->threadNoCompleted > 0)) { goto checkCompleted; }//if } else { if (tabThread->threadNoCompleted == 0) { transNdbRef->transErrorCount = 0; transNdbRef->vpn_identity = tabThread->threadNextStart; if (tabThread->threadNextStart == tabThread->threadStop) { tabThread->threadLoopCounter++; transNdbRef->vpn_identity = 0; tabThread->threadNextStart = 0; if (tabThread->threadLoopCounter == tNoOfLoops) { goto checkCompleted; }//if }//if tabThread->threadNextStart += tabThread->threadIncrement; } else { goto checkCompleted; }//if }//if tNdb->closeTransaction(NdbObject); executeTransaction(transNdbRef); return;checkCompleted: tNdb->closeTransaction(NdbObject); tabThread->threadNoCompleted++; if (tabThread->threadNoCompleted == tNoOfParallelTrans) { tabThread->threadCompleted = true; }//if return; }//executeCallback()staticStartTyperandom_choice(){//----------------------------------------------------// Generate a random key between 0 and tNoOfRecords - 1//---------------------------------------------------- UintR random_number = lrand48() % 100; if (random_number < tUpdateFreq) return stUpdate; else return stRead;}//random_choice()static booldefineOperation(NdbConnection* localNdbConnection, TransNdb* transNdbRef, unsigned int vpn_id, unsigned int vpn_nb){ NdbOperation* localNdbOperation; StartType TType = transNdbRef->transStartType; //------------------------------------------------------- // Set-up the attribute values for this operation. //------------------------------------------------------- localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]); if (localNdbOperation == NULL) { error_handler(localNdbConnection->getNdbError()); return false; }//if switch (TType) { case stInsert: // Insert case if (theWriteFlag == 1 && theDirtyFlag == 1) { localNdbOperation->dirtyWrite(); } else if (theWriteFlag == 1) { localNdbOperation->writeTuple(); } else { localNdbOperation->insertTuple(); }//if break; case stRead: // Read Case TType = random_choice(); if (TType == stRead) { if (theSimpleFlag == 1) { localNdbOperation->simpleRead(); } else if (theDirtyFlag == 1) { localNdbOperation->dirtyRead(); } else { localNdbOperation->readTuple(); }//if } else { if (theWriteFlag == 1 && theDirtyFlag == 1) { localNdbOperation->dirtyWrite(); } else if (theWriteFlag == 1) { localNdbOperation->writeTuple(); } else if (theDirtyFlag == 1) { localNdbOperation->dirtyUpdate(); } else { localNdbOperation->updateTuple(); }//if }//if break; case stDelete: // Delete Case localNdbOperation->deleteTuple(); break; default: error_handler(localNdbOperation->getNdbError()); }//switch localNdbOperation->equal((Uint32)0,vpn_id); localNdbOperation->equal((Uint32)1,vpn_nb); char* attrValue = &transNdbRef->transRecord[0]; switch (TType) { case stInsert: // Insert case localNdbOperation->setValue((Uint32)2, attrValue); localNdbOperation->setValue((Uint32)3, attrValue); localNdbOperation->setValue((Uint32)4, attrValue); break; case stUpdate: // Update Case localNdbOperation->setValue((Uint32)3, attrValue); break; case stRead: // Read Case localNdbOperation->getValue((Uint32)2, attrValue); localNdbOperation->getValue((Uint32)3, attrValue); localNdbOperation->getValue((Uint32)4, attrValue); break; case stDelete: // Delete Case break; default: error_handler(localNdbOperation->getNdbError()); }//switch localNdbConnection->executeAsynchPrepare(Commit, &executeCallback, (void*)transNdbRef); return true;}//defineOperation()static void setAttrNames(){ BaseString::snprintf(attrName[0], MAXSTRLEN, "VPN_ID"); BaseString::snprintf(attrName[1], MAXSTRLEN, "VPN_NB"); BaseString::snprintf(attrName[2], MAXSTRLEN, "DIRECTORY_NB"); BaseString::snprintf(attrName[3], MAXSTRLEN, "LAST_CALL_PARTY"); BaseString::snprintf(attrName[4], MAXSTRLEN, "DESCR");}static void setTableNames(){ BaseString::snprintf(tableName[0], MAXSTRLEN, "VPN_USERS");}staticint createTables(Ndb* pMyNdb){ NdbSchemaCon *MySchemaTransaction; NdbSchemaOp *MySchemaOp; int check; if (theTableCreateFlag == 0) { ndbout << "Creating Table: vpn_users " << "..." << endl; MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb); if(MySchemaTransaction == NULL && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; MySchemaOp = MySchemaTransaction->getNdbSchemaOp(); if(MySchemaOp == NULL && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; check = MySchemaOp->createTable( tableName[0] ,8 // Table Size ,TupleKey // Key Type ,40 // Nr of Pages ,All ,6 ,(tLoadFactor - 5) ,tLoadFactor ,1 ,!tempTable ); if (check == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; check = MySchemaOp->createAttribute( (char*)attrName[0], TupleKey, 32, 1, UnSigned, MMBased, NotNullAttribute ); if (check == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; check = MySchemaOp->createAttribute( (char*)attrName[1], TupleKey, 32, 1, UnSigned, MMBased, NotNullAttribute ); if (check == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; check = MySchemaOp->createAttribute( (char*)attrName[2], NoKey, 8, 10, UnSigned, MMBased, NotNullAttribute ); if (check == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; check = MySchemaOp->createAttribute( (char*)attrName[3], NoKey, 8, 10, UnSigned, MMBased, NotNullAttribute ); if (check == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; check = MySchemaOp->createAttribute( (char*)attrName[4], NoKey, 8, 100, UnSigned, MMBased, NotNullAttribute ); if (check == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; if (MySchemaTransaction->execute() == -1 && (!error_handler(MySchemaTransaction->getNdbError()))) return -1; NdbSchemaCon::closeSchemaTrans(MySchemaTransaction); }//if return 0;}bool error_handler(const NdbError& err){ ndbout << err << endl ; switch(err.classification){ case NdbError::NodeRecoveryError: case NdbError::SchemaError: case NdbError::TimeoutExpired: ndbout << endl << "Attempting to recover and continue now..." << endl ; return true ; // return true to retry } return false;}#if 0bool error_handler(const char* error_string, int error_int) { ndbout << error_string << endl ; if ((4008 == error_int) || (677 == error_int) || (891 == error_int) || (1221 == error_int) || (721 == error_int) || (266 == error_int)) { ndbout << endl << "Attempting to recover and continue now..." << endl ; return true ; // return true to retry } return false ; // return false to abort}#endifstaticint readArguments(int argc, const char** argv){ int i = 1; while (argc > 1){ if (strcmp(argv[i], "-t") == 0){ tNoOfThreads = atoi(argv[i+1]); if ((tNoOfThreads < 1) || (tNoOfThreads > MAXTHREADS)){ ndbout_c("Invalid no of threads"); return -1; } } else if (strcmp(argv[i], "-p") == 0){ tNoOfParallelTrans = atoi(argv[i+1]); if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){ ndbout_c("Invalid no of parallell transactions"); return -1; } } else if (strcmp(argv[i], "-o") == 0) { tNoOfTransactions = atoi(argv[i+1]); if (tNoOfTransactions < 1){ ndbout_c("Invalid no of transactions"); return -1; } } else if (strcmp(argv[i], "-l") == 0){ tNoOfLoops = atoi(argv[i+1]); if (tNoOfLoops < 1) { ndbout_c("Invalid no of loops"); return -1; } } else if (strcmp(argv[i], "-e") == 0){ tMinEvents = atoi(argv[i+1]); if ((tMinEvents < 1) || (tMinEvents > tNoOfParallelTrans)) { ndbout_c("Invalid no of loops"); return -1; } } else if (strcmp(argv[i], "-local") == 0){ tLocalPart = atoi(argv[i+1]); tLocal = true; startTransGuess = true; if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){ ndbout_c("Invalid local part"); return -1; } } else if (strcmp(argv[i], "-ufreq") == 0){ tUpdateFreq = atoi(argv[i+1]); if ((tUpdateFreq < 0) || (tUpdateFreq > 100)){ ndbout_c("Invalid Update Frequency"); return -1; } } else if (strcmp(argv[i], "-load_factor") == 0){ tLoadFactor = atoi(argv[i+1]); if ((tLoadFactor < 40) || (tLoadFactor >= 100)){ ndbout_c("Invalid LoadFactor"); return -1; } } else if (strcmp(argv[i], "-d") == 0){ tDelete = true; argc++; i--; } else if (strcmp(argv[i], "-i") == 0){ tInsert = true; argc++; i--; } else if (strcmp(argv[i], "-simple") == 0){ theSimpleFlag = 1; argc++; i--; } else if (strcmp(argv[i], "-adaptive") == 0){ tSendForce = 0; argc++; i--; } else if (strcmp(argv[i], "-force") == 0){ tSendForce = 1; argc++; i--; } else if (strcmp(argv[i], "-non_adaptive") == 0){ tSendForce = 2; argc++; i--; } else if (strcmp(argv[i], "-write") == 0){ theWriteFlag = 1; argc++; i--; } else if (strcmp(argv[i], "-dirty") == 0){ theDirtyFlag = 1; argc++; i--; } else if (strcmp(argv[i], "-table_create") == 0){ theTableCreateFlag = 0; tInsert = true; argc++; i--; } else if (strcmp(argv[i], "-temp") == 0){ tempTable = true; argc++; i--; } else if (strcmp(argv[i], "-no_hint") == 0){ startTransGuess = false; argc++; i--; } else { return -1; } argc -= 2; i = i + 2; }//while if (tLocal == true) { if (startTransGuess == false) { ndbout_c("Not valid to use no_hint with local"); }//if }//if return 0;}staticvoidinput_error(){ ndbout_c("FLEXTT"); ndbout_c(" Perform benchmark of insert, update and delete transactions"); ndbout_c(""); ndbout_c("Arguments:"); ndbout_c(" -t Number of threads to start, default 1"); ndbout_c(" -p Number of parallel transactions per thread, default 32"); ndbout_c(" -o Number of transactions per loop, default 500"); ndbout_c(" -ufreq Number Update Frequency in percent (0 -> 100), rest is read"); ndbout_c(" -load_factor Number Fill level in index in percent (40 -> 99)"); ndbout_c(" -l Number of loops to run, default 1, 0=infinite"); ndbout_c(" -i Start by inserting all records"); ndbout_c(" -d End by deleting all records (only one loop)"); ndbout_c(" -simple Use simple read to read from database"); ndbout_c(" -dirty Use dirty read to read from database"); ndbout_c(" -write Use writeTuple in insert and update"); ndbout_c(" -n Use standard table names"); ndbout_c(" -table_create Create tables in db"); ndbout_c(" -temp Create table(s) without logging"); ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator"); ndbout_c(" -adaptive Use adaptive send algorithm (default)"); ndbout_c(" -force Force send when communicating"); ndbout_c(" -non_adaptive Send at a 10 millisecond interval"); ndbout_c(" -local Number of part, only use keys in one part out of 16");}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -