⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 flexasynch.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
       * Perform update.                                              *       ****************************************************************/            failed = 0 ;                START_TIMER;      execute(stUpdate);      STOP_TIMER;      PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;      if (0 < failed) {        i = retry_opt ;        int cu = 1 ;        while (0 < failed && 0 < i){          ndbout << failed << " of the transactions returned errors!"<<endl ;          ndbout << endl;          ndbout <<"Attempting to redo the failed transactions now..." << endl;          ndbout << endl <<"Redo attempt " << cu <<" out of ";          ndbout << retry_opt << endl << endl;          failed = 0 ;          START_TIMER;          execute(stUpdate);          STOP_TIMER;          PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);          i-- ;          cu++ ;        }//while        if(0 == failed ){          ndbout << endl <<"Redo attempt succeeded" << endl << endl;        } else {          ndbout << endl;          ndbout <<"Redo attempt failed, moving on now..." << endl << endl;        }//if      }//if                /****************************************************************       * Perform read.                                             *       ****************************************************************/            failed = 0 ;                START_TIMER;      execute(stRead);      STOP_TIMER;      PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);      if (0 < failed) {        i = retry_opt ;        int cr2 = 1 ;        while (0 < failed && 0 < i){          ndbout << failed << " of the transactions returned errors!"<<endl ;          ndbout << endl;          ndbout <<"Attempting to redo the failed transactions now..." << endl;          ndbout << endl <<"Redo attempt " << cr2 <<" out of ";          ndbout << retry_opt << endl << endl;          failed = 0 ;          START_TIMER;          execute(stRead);          STOP_TIMER;          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);          i-- ;          cr2++ ;        }//while        if(0 == failed ){          ndbout << endl <<"Redo attempt succeeded" << endl << endl;        }else{          ndbout << endl;          ndbout << "Redo attempt failed, moving on now..." << endl << endl;        }//if      }//if                /****************************************************************       * Perform delete.                                              *       ****************************************************************/            failed = 0 ;                START_TIMER;      execute(stDelete);      STOP_TIMER;      PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);      if (0 < failed) {        i = retry_opt ;        int cd = 1 ;        while (0 < failed && 0 < i){          ndbout << failed << " of the transactions returned errors!"<< endl ;          ndbout << endl;          ndbout <<"Attempting to redo the failed transactions now:" << endl ;          ndbout << endl <<"Redo attempt " << cd <<" out of ";          ndbout << retry_opt << endl << endl;          failed = 0 ;          START_TIMER;          execute(stDelete);          STOP_TIMER;          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);          i-- ;          cd++ ;        }//while        if(0 == failed ){          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;        }else{          ndbout << endl;          ndbout << "Redo attempt failed, moving on now..." << endl << endl;        }//if      }//if                tLoops++;      ndbout << "--------------------------------------------------" << endl;          if(tNoOfLoops != 0){        if(tNoOfLoops <= tLoops)          break ;      }    }//for        execute(stStop);    void * tmp;    for(i = 0; i<tNoOfThreads; i++){      NdbThread_WaitFor(threadLife[i], &tmp);      NdbThread_Destroy(&threadLife[i]);    }  }   delete [] pThreadData;  delete pNdb;  //printing errorCounters  flexAsynchErrorData->printErrorCounters(ndbout);  return NDBT_ProgramExit(returnValue);}//main()static void execute(StartType aType){  resetThreads();  tellThreads(aType);  waitForThreads();}//execute()static void*threadLoop(void* ThreadData){  Ndb* localNdb;  StartType tType;  ThreadNdb* tabThread = (ThreadNdb*)ThreadData;  int threadNo = tabThread->ThreadNo;  localNdb = new Ndb(g_cluster_connection, "TEST_DB");  localNdb->init(1024);  localNdb->waitUntilReady(10000);  unsigned int threadBase = (threadNo << 16) + tNodeId ;    for (;;){    while (ThreadStart[threadNo] == stIdle) {      NdbSleep_MilliSleep(10);    }//while    // Check if signal to exit is received    if (ThreadStart[threadNo] == stStop) {      break;    }//if    tType = ThreadStart[threadNo];    ThreadStart[threadNo] = stIdle;    if(!executeThread(tType, localNdb, threadBase)){      break;    }    ThreadReady[threadNo] = 1;  }//for  delete localNdb;  ThreadReady[threadNo] = 1;  return NULL;}//threadLoop()static boolexecuteThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {  int i, j, k;  NdbConnection* tConArray[1024];  unsigned int tBase;  unsigned int tBase2;  for (i = 0; i < tNoOfTransactions; i++) {    if (tLocal == false) {      tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;    } else {      tBase = i * tNoOfParallelTrans * MAX_SEEK;    }//if    START_REAL_TIME;    for (j = 0; j < tNoOfParallelTrans; j++) {      if (tLocal == false) {        tBase2 = tBase + (j * tNoOfOpsPerTrans);      } else {        tBase2 = tBase + (j * MAX_SEEK);        tBase2 = getKey(threadBase, tBase2);      }//if      if (startTransGuess == true) {        Uint64 Tkey64;        Uint32* Tkey32 = (Uint32*)&Tkey64;        Tkey32[0] = threadBase;        Tkey32[1] = tBase2;        tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority                                         (const char*)&Tkey64, //Main PKey                                         (Uint32)4);           //Key Length      } else {        tConArray[j] = aNdbObject->startTransaction();      }//if      if (tConArray[j] == NULL &&           !error_handler(aNdbObject->getNdbError()) ){        ndbout << endl << "Unable to recover! Quiting now" << endl ;        return false;      }//if            for (k = 0; k < tNoOfOpsPerTrans; k++) {        //-------------------------------------------------------        // Define the operation, but do not execute it yet.        //-------------------------------------------------------        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));      }//for            tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);    }//for    STOP_REAL_TIME;    //-------------------------------------------------------    // Now we have defined a set of operations, it is now time    // to execute all of them.    //-------------------------------------------------------    int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);    while (Tcomp < tNoOfParallelTrans) {      int TlocalComp = aNdbObject->pollNdb(3000, 0);      Tcomp += TlocalComp;    }//while    for (j = 0 ; j < tNoOfParallelTrans ; j++) {      aNdbObject->closeTransaction(tConArray[j]);    }//for  }//for  return true;}//executeThread()static Uint32getKey(Uint32 aBase, Uint32 anIndex) {  Uint32 Tfound = anIndex;  Uint64 Tkey64;  Uint32* Tkey32 = (Uint32*)&Tkey64;  Tkey32[0] = aBase;  Uint32 hash;  for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {    Tkey32[1] = (Uint32)i;    hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);    hash = (hash >> 6) & (MAX_PARTS - 1);    if (hash == tLocalPart) {      Tfound = i;      break;    }//if  }//for  return Tfound;}//getKey()static voidexecuteCallback(int result, NdbConnection* NdbObject, void* aObject){  if (result == -1) {              // Add complete error handling here              int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());              if (retCode == 1) {		if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){                ndbout_c("execute: %s", NdbObject->getNdbError().message);                ndbout_c("Error code = %d", NdbObject->getNdbError().code);}              } else if (retCode == 2) {                ndbout << "4115 should not happen in flexAsynch" << endl;              } else if (retCode == 3) {		/* What can we do here? */                ndbout_c("execute: %s", NdbObject->getNdbError().message);                 }//if(retCode == 3)	      //    ndbout << "Error occured in poll:" << endl;	      //    ndbout << NdbObject->getNdbError() << endl;    failed++ ;    return;  }//if  return;}//executeCallback()static voiddefineOperation(NdbConnection* localNdbConnection, StartType aType,                Uint32 threadBase, Uint32 aIndex){  NdbOperation*  localNdbOperation;  unsigned int   loopCountAttributes = tNoOfAttributes;  unsigned int   countAttributes;  Uint32         attrValue[MAXATTRSIZE];  //-------------------------------------------------------  // Set-up the attribute values for this operation.  //-------------------------------------------------------  attrValue[0] = threadBase;  attrValue[1] = aIndex;  for (int k = 2; k < loopCountAttributes; k++) {    attrValue[k] = aIndex;  }//for  localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);          if (localNdbOperation == NULL) {    error_handler(localNdbConnection->getNdbError());  }//if  switch (aType) {  case stInsert: {   // Insert case    if (theWriteFlag == 1 && theDirtyFlag == 1) {      localNdbOperation->dirtyWrite();    } else if (theWriteFlag == 1) {      localNdbOperation->writeTuple();    } else {      localNdbOperation->insertTuple();    }//if    break;  }//case  case stRead: {     // Read Case    if (theSimpleFlag == 1) {      localNdbOperation->simpleRead();    } else if (theDirtyFlag == 1) {      localNdbOperation->dirtyRead();    } else {      localNdbOperation->readTuple();    }//if    break;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -