⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbif.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
  Uint32         i;  if (aNoOfCompletedTrans > 0) {    for (i = 0; i < aNoOfCompletedTrans; i++) {      void* anyObject = aCopyArray[i]->theCallbackObject;      NdbAsynchCallback aCallback = aCopyArray[i]->theCallbackFunction;      int tResult = 0;      if (aCallback != NULL) {        if (aCopyArray[i]->theReturnStatus == NdbTransaction::ReturnFailure) {          tResult = -1;        }//if        (*aCallback)(tResult, aCopyArray[i], anyObject);      }//if    }//for  }//if}//Ndb::reportCallback()/*****************************************************************************Uint32 pollCompleted(NdbTransaction** aCopyArray);Remark:   Transfer the data from the completed transaction to a local array.          This support is used by a number of the poll-methods.******************************************************************************/Uint32	Ndb::pollCompleted(NdbTransaction** aCopyArray){  check_send_timeout();  Uint32         i;  Uint32 tNoCompletedTransactions = theNoOfCompletedTransactions;  if (tNoCompletedTransactions > 0) {    for (i = 0; i < tNoCompletedTransactions; i++) {      aCopyArray[i] = theCompletedTransactionsArray[i];      if (aCopyArray[i]->theListState != NdbTransaction::InCompletedList) {        ndbout << "pollCompleted error ";        ndbout << (int) aCopyArray[i]->theListState << endl;	abort();      }//if      theCompletedTransactionsArray[i] = NULL;      aCopyArray[i]->theListState = NdbTransaction::NotInList;    }//for  }//if  theNoOfCompletedTransactions = 0;  return tNoCompletedTransactions;}//Ndb::pollCompleted()voidNdb::check_send_timeout(){  Uint32 timeout = TransporterFacade::instance()->m_waitfor_timeout;  NDB_TICKS current_time = NdbTick_CurrentMillisecond();  if (current_time - the_last_check_time > 1000) {    the_last_check_time = current_time;    Uint32 no_of_sent = theNoOfSentTransactions;    for (Uint32 i = 0; i < no_of_sent; i++) {      NdbTransaction* a_con = theSentTransactionsArray[i];      if ((current_time - a_con->theStartTransTime) > timeout)      {#ifdef VM_TRACE        a_con->printState();	Uint32 t1 = a_con->theTransactionId;	Uint32 t2 = a_con->theTransactionId >> 32;	ndbout_c("4012 [%.8x %.8x]", t1, t2);	//abort();#endif        a_con->theReleaseOnClose = true;        a_con->setOperationErrorCodeAbort(4012);	a_con->theCommitStatus = NdbTransaction::NeedAbort;        a_con->theCompletionStatus = NdbTransaction::CompletedFailure;        a_con->handleExecuteCompletion();        remove_sent_list(i);        insert_completed_list(a_con);        no_of_sent--;        i--;      }//if    }//for  }//if}voidNdb::remove_sent_list(Uint32 list_index){  Uint32 last_index = theNoOfSentTransactions - 1;  if (list_index < last_index) {    NdbTransaction* t_con = theSentTransactionsArray[last_index];    theSentTransactionsArray[list_index] = t_con;  }//if  theNoOfSentTransactions = last_index;  theSentTransactionsArray[last_index] = 0;}Uint32Ndb::insert_completed_list(NdbTransaction* a_con){  Uint32 no_of_comp = theNoOfCompletedTransactions;  theCompletedTransactionsArray[no_of_comp] = a_con;  theNoOfCompletedTransactions = no_of_comp + 1;  a_con->theListState = NdbTransaction::InCompletedList;  a_con->theTransArrayIndex = no_of_comp;  return no_of_comp;}Uint32Ndb::insert_sent_list(NdbTransaction* a_con){  Uint32 no_of_sent = theNoOfSentTransactions;  theSentTransactionsArray[no_of_sent] = a_con;  theNoOfSentTransactions = no_of_sent + 1;  a_con->theListState = NdbTransaction::InSendList;  a_con->theTransArrayIndex = no_of_sent;  return no_of_sent;}/*****************************************************************************void sendPrepTrans(int forceSend);Remark: Send a batch of transactions prepared for sending to the NDB kernel.  ******************************************************************************/voidNdb::sendPrepTrans(int forceSend){  // Always called when holding mutex on TransporterFacade  /*     We will send a list of transactions to the NDB kernel. Before     sending we check the following.     1) Node connected to is still alive        Checked by both checking node status and node sequence     2) Send buffer can handle the size of messages we are planning to send        So far this is just a fake check but will soon be a real check     When the connected node has failed we abort the transaction without     responding anymore to the node since the kernel will clean up     automatically.     When sendBuffer cannot handle anymore messages then we will also abort     transaction but by communicating to the kernel since it is still alive     and we keep a small space for messages like that.  */  Uint32 i;  TransporterFacade* tp = TransporterFacade::instance();  Uint32 no_of_prep_trans = theNoOfPreparedTransactions;  for (i = 0; i < no_of_prep_trans; i++) {    NdbTransaction * a_con = thePreparedTransactionsArray[i];    thePreparedTransactionsArray[i] = NULL;    Uint32 node_id = a_con->getConnectedNodeId();    if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) &&         tp->get_node_alive(node_id) ||         (tp->get_node_stopping(node_id) &&          ((a_con->theSendStatus == NdbTransaction::sendABORT) ||          (a_con->theSendStatus == NdbTransaction::sendABORTfail) ||          (a_con->theSendStatus == NdbTransaction::sendCOMMITstate) ||          (a_con->theSendStatus == NdbTransaction::sendCompleted)))) {      /*      We will send if      1) Node is alive and sequences are correct OR      2) Node is stopping and we only want to commit or abort      In a graceful stop situation we want to ensure quick aborts      of all transactions and commits and thus we allow aborts and      commits to continue but not normal operations.      */      if (tp->check_send_size(node_id, a_con->get_send_size())) {        if (a_con->doSend() == 0) {          NDB_TICKS current_time = NdbTick_CurrentMillisecond();          a_con->theStartTransTime = current_time;          continue;        } else {          /*          Although all precautions we did not manage to send the operations          Must have been a dropped connection on the transporter side.          We don't expect to be able to continue using this connection so          we will treat it as a node failure.          */          TRACE_DEBUG("Send problem even after checking node status");        }//if      } else {        /*        The send buffer is currently full or at least close to. We will        not allow a send to continue. We will set the connection so that        it is indicated that we need to abort the transaction. If we were        trying to commit or abort and got a send buffer we will not try        again and will thus set the state to Aborted to avoid a more or        less eternal loop of tries.        */        if (a_con->theSendStatus == NdbTransaction::sendOperations) {          a_con->setOperationErrorCodeAbort(4021);          a_con->theCommitStatus = NdbTransaction::NeedAbort;          TRACE_DEBUG("Send buffer full and sendOperations");        } else {          a_con->setOperationErrorCodeAbort(4026);          a_con->theCommitStatus = NdbTransaction::Aborted;          TRACE_DEBUG("Send buffer full, set state to Aborted");        }//if      }//if    } else {#ifdef VM_TRACE      a_con->printState();#endif      if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) &&          tp->get_node_stopping(node_id)) {        /*        The node we are connected to is currently in an early stopping phase        of a graceful stop. We will not send the prepared transactions. We        will simply refuse and let the application code handle the abort.        */        TRACE_DEBUG("Abort a transaction when stopping a node");        a_con->setOperationErrorCodeAbort(4023);        a_con->theCommitStatus = NdbTransaction::NeedAbort;      } else {        /*        The node is hard dead and we cannot continue. We will also release        the connection to the free pool.        */        TRACE_DEBUG("The node was stone dead, inform about abort");        a_con->setOperationErrorCodeAbort(4025);        a_con->theReleaseOnClose = true;        a_con->theTransactionIsStarted = false;        a_con->theCommitStatus = NdbTransaction::Aborted;      }//if    }//if    a_con->theReturnStatus = NdbTransaction::ReturnFailure;    a_con->theCompletionStatus = NdbTransaction::CompletedFailure;    a_con->handleExecuteCompletion();    insert_completed_list(a_con);  }//for  theNoOfPreparedTransactions = 0;  if (forceSend == 0) {     tp->checkForceSend(theNdbBlockNumber);  } else if (forceSend == 1) {     tp->forceSend(theNdbBlockNumber);  }//if  return;}//Ndb::sendPrepTrans()/*****************************************************************************void waitCompletedTransactions(int aMilliSecondsToWait, int noOfEventsToWaitFor);Remark:   First send all prepared operations and then check if there are any          transactions already completed. Do not wait for not completed          transactions.******************************************************************************/void	Ndb::waitCompletedTransactions(int aMilliSecondsToWait, 			       int noOfEventsToWaitFor){  theImpl->theWaiter.m_state = NO_WAIT;   /**   * theImpl->theWaiter.m_state = NO_WAIT;    * To ensure no messup with synchronous node fail handling   * (see ReportFailure)   */  int waitTime = aMilliSecondsToWait;  NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + (NDB_TICKS)waitTime;  theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;  do {    if (waitTime < 1000) waitTime = 1000;    NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,			     (NdbMutex*)theImpl->theWaiter.m_mutex,			     waitTime);    if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {      break;    }//if    theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;    waitTime = (int)(maxTime - NdbTick_CurrentMillisecond());  } while (waitTime > 0);  return;}//Ndb::waitCompletedTransactions()/*****************************************************************************void sendPreparedTransactions(int forceSend = 0);Remark:   First send all prepared operations and then check if there are any          transactions already completed. Do not wait for not completed          transactions.******************************************************************************/void	Ndb::sendPreparedTransactions(int forceSend){  TransporterFacade::instance()->lock_mutex();  sendPrepTrans(forceSend);  TransporterFacade::instance()->unlock_mutex();  return;}//Ndb::sendPreparedTransactions()/*****************************************************************************int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup = 1, int forceSend = 0);Remark:   First send all prepared operations and then check if there are any          transactions already completed. Wait for not completed          transactions until the specified number have completed or until the          timeout has occured. Timeout zero means no waiting time.******************************************************************************/int	Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend){  NdbTransaction* tConArray[1024];  Uint32         tNoCompletedTransactions;  //theCurrentConnectCounter = 0;  //theCurrentConnectIndex++;  TransporterFacade::instance()->lock_mutex();  sendPrepTrans(forceSend);  if ((minNoOfEventsToWakeup <= 0) ||      ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {    minNoOfEventsToWakeup = theNoOfSentTransactions;  }//if  if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&      (aMillisecondNumber > 0)) {    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);    tNoCompletedTransactions = pollCompleted(tConArray);  } else {    tNoCompletedTransactions = pollCompleted(tConArray);  }//if  TransporterFacade::instance()->unlock_mutex();  reportCallback(tConArray, tNoCompletedTransactions);  return tNoCompletedTransactions;}//Ndb::sendPollNdb()/*****************************************************************************int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup);Remark:   Check if there are any transactions already completed. Wait for not          completed transactions until the specified number have completed or          until the timeout has occured. Timeout zero means no waiting time.******************************************************************************/int	Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup){  NdbTransaction* tConArray[1024];  Uint32         tNoCompletedTransactions;  //theCurrentConnectCounter = 0;  //theCurrentConnectIndex++;  TransporterFacade::instance()->lock_mutex();  if ((minNoOfEventsToWakeup == 0) ||      ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {    minNoOfEventsToWakeup = theNoOfSentTransactions;  }//if  if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&      (aMillisecondNumber > 0)) {    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);    tNoCompletedTransactions = pollCompleted(tConArray);  } else {    tNoCompletedTransactions = pollCompleted(tConArray);  }//if  TransporterFacade::instance()->unlock_mutex();  reportCallback(tConArray, tNoCompletedTransactions);  return tNoCompletedTransactions;}//Ndb::sendPollNdbWithoutWait()/*****************************************************************************int receiveOptimisedResponse();Return:  0 - Response received        -1 - Timeout occured waiting for response        -2 - Node failure interupted wait for response******************************************************************************/int	Ndb::receiveResponse(int waitTime){  int tResultCode;  TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);    theImpl->theWaiter.wait(waitTime);    if(theImpl->theWaiter.m_state == NO_WAIT) {    tResultCode = 0;  } else {#ifdef VM_TRACE    ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";    ndbout << theImpl->theWaiter.m_state << endl;#endif    if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){      tResultCode = -2;    } else {      tResultCode = -1;    }    theImpl->theWaiter.m_state = NO_WAIT;  }  return tResultCode;}//Ndb::receiveResponse()intNdb::sendRecSignal(Uint16 node_id,		   Uint32 aWaitState,		   NdbApiSignal* aSignal,                   Uint32 conn_seq){  /*  In most situations 0 is returned.  In error cases we have 5 different cases  -1: Send ok, time out in waiting for reply  -2: Node has failed  -3: Send buffer not full, send failed yet  -4: Send buffer full  -5: Node is currently stopping  */  int return_code;  TransporterFacade* tp = TransporterFacade::instance();  Uint32 send_size = 1; // Always sends one signal only   tp->lock_mutex();  // Protected area  if ((tp->get_node_alive(node_id)) &&      ((tp->getNodeSequence(node_id) == conn_seq) ||       (conn_seq == 0))) {    if (tp->check_send_size(node_id, send_size)) {      return_code = tp->sendSignal(aSignal, node_id);      if (return_code != -1) {        theImpl->theWaiter.m_node = node_id;        theImpl->theWaiter.m_state = aWaitState;        return_code = receiveResponse();      } else {	return_code = -3;      }    } else {      return_code = -4;    }//if  } else {    if ((tp->get_node_stopping(node_id)) &&        ((tp->getNodeSequence(node_id) == conn_seq) ||         (conn_seq == 0))) {      return_code = -5;    } else {      return_code = -2;    }//if  }//if  tp->unlock_mutex();  // End of protected area  return return_code;}//Ndb::sendRecSignal()voidNdbTransaction::sendTC_COMMIT_ACK(NdbApiSignal * aSignal,				 Uint32 transId1, Uint32 transId2, 				 Uint32 aTCRef){#ifdef MARKER_TRACE  ndbout_c("Sending TC_COMMIT_ACK(0x%.8x, 0x%.8x) to -> %d",	   transId1,	   transId2,	   refToNode(aTCRef));#endif    TransporterFacade *tp = TransporterFacade::instance();  aSignal->theTrace                = TestOrd::TraceAPI;  aSignal->theReceiversBlockNumber = DBTC;  aSignal->theVerId_signalNumber   = GSN_TC_COMMIT_ACK;  aSignal->theLength               = 2;  Uint32 * dataPtr = aSignal->getDataPtrSend();  dataPtr[0] = transId1;  dataPtr[1] = transId2;    tp->sendSignalUnCond(aSignal, refToNode(aTCRef));}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -