📄 ndbif.cpp
字号:
Uint32 i; if (aNoOfCompletedTrans > 0) { for (i = 0; i < aNoOfCompletedTrans; i++) { void* anyObject = aCopyArray[i]->theCallbackObject; NdbAsynchCallback aCallback = aCopyArray[i]->theCallbackFunction; int tResult = 0; if (aCallback != NULL) { if (aCopyArray[i]->theReturnStatus == NdbTransaction::ReturnFailure) { tResult = -1; }//if (*aCallback)(tResult, aCopyArray[i], anyObject); }//if }//for }//if}//Ndb::reportCallback()/*****************************************************************************Uint32 pollCompleted(NdbTransaction** aCopyArray);Remark: Transfer the data from the completed transaction to a local array. This support is used by a number of the poll-methods.******************************************************************************/Uint32 Ndb::pollCompleted(NdbTransaction** aCopyArray){ check_send_timeout(); Uint32 i; Uint32 tNoCompletedTransactions = theNoOfCompletedTransactions; if (tNoCompletedTransactions > 0) { for (i = 0; i < tNoCompletedTransactions; i++) { aCopyArray[i] = theCompletedTransactionsArray[i]; if (aCopyArray[i]->theListState != NdbTransaction::InCompletedList) { ndbout << "pollCompleted error "; ndbout << (int) aCopyArray[i]->theListState << endl; abort(); }//if theCompletedTransactionsArray[i] = NULL; aCopyArray[i]->theListState = NdbTransaction::NotInList; }//for }//if theNoOfCompletedTransactions = 0; return tNoCompletedTransactions;}//Ndb::pollCompleted()voidNdb::check_send_timeout(){ Uint32 timeout = TransporterFacade::instance()->m_waitfor_timeout; NDB_TICKS current_time = NdbTick_CurrentMillisecond(); if (current_time - the_last_check_time > 1000) { the_last_check_time = current_time; Uint32 no_of_sent = theNoOfSentTransactions; for (Uint32 i = 0; i < no_of_sent; i++) { NdbTransaction* a_con = theSentTransactionsArray[i]; if ((current_time - a_con->theStartTransTime) > timeout) {#ifdef VM_TRACE a_con->printState(); Uint32 t1 = a_con->theTransactionId; Uint32 t2 = a_con->theTransactionId >> 32; ndbout_c("4012 [%.8x %.8x]", t1, t2); //abort();#endif a_con->theReleaseOnClose = true; a_con->setOperationErrorCodeAbort(4012); a_con->theCommitStatus = NdbTransaction::NeedAbort; a_con->theCompletionStatus = NdbTransaction::CompletedFailure; a_con->handleExecuteCompletion(); remove_sent_list(i); insert_completed_list(a_con); no_of_sent--; i--; }//if }//for }//if}voidNdb::remove_sent_list(Uint32 list_index){ Uint32 last_index = theNoOfSentTransactions - 1; if (list_index < last_index) { NdbTransaction* t_con = theSentTransactionsArray[last_index]; theSentTransactionsArray[list_index] = t_con; }//if theNoOfSentTransactions = last_index; theSentTransactionsArray[last_index] = 0;}Uint32Ndb::insert_completed_list(NdbTransaction* a_con){ Uint32 no_of_comp = theNoOfCompletedTransactions; theCompletedTransactionsArray[no_of_comp] = a_con; theNoOfCompletedTransactions = no_of_comp + 1; a_con->theListState = NdbTransaction::InCompletedList; a_con->theTransArrayIndex = no_of_comp; return no_of_comp;}Uint32Ndb::insert_sent_list(NdbTransaction* a_con){ Uint32 no_of_sent = theNoOfSentTransactions; theSentTransactionsArray[no_of_sent] = a_con; theNoOfSentTransactions = no_of_sent + 1; a_con->theListState = NdbTransaction::InSendList; a_con->theTransArrayIndex = no_of_sent; return no_of_sent;}/*****************************************************************************void sendPrepTrans(int forceSend);Remark: Send a batch of transactions prepared for sending to the NDB kernel. ******************************************************************************/voidNdb::sendPrepTrans(int forceSend){ // Always called when holding mutex on TransporterFacade /* We will send a list of transactions to the NDB kernel. Before sending we check the following. 1) Node connected to is still alive Checked by both checking node status and node sequence 2) Send buffer can handle the size of messages we are planning to send So far this is just a fake check but will soon be a real check When the connected node has failed we abort the transaction without responding anymore to the node since the kernel will clean up automatically. When sendBuffer cannot handle anymore messages then we will also abort transaction but by communicating to the kernel since it is still alive and we keep a small space for messages like that. */ Uint32 i; TransporterFacade* tp = TransporterFacade::instance(); Uint32 no_of_prep_trans = theNoOfPreparedTransactions; for (i = 0; i < no_of_prep_trans; i++) { NdbTransaction * a_con = thePreparedTransactionsArray[i]; thePreparedTransactionsArray[i] = NULL; Uint32 node_id = a_con->getConnectedNodeId(); if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) && tp->get_node_alive(node_id) || (tp->get_node_stopping(node_id) && ((a_con->theSendStatus == NdbTransaction::sendABORT) || (a_con->theSendStatus == NdbTransaction::sendABORTfail) || (a_con->theSendStatus == NdbTransaction::sendCOMMITstate) || (a_con->theSendStatus == NdbTransaction::sendCompleted)))) { /* We will send if 1) Node is alive and sequences are correct OR 2) Node is stopping and we only want to commit or abort In a graceful stop situation we want to ensure quick aborts of all transactions and commits and thus we allow aborts and commits to continue but not normal operations. */ if (tp->check_send_size(node_id, a_con->get_send_size())) { if (a_con->doSend() == 0) { NDB_TICKS current_time = NdbTick_CurrentMillisecond(); a_con->theStartTransTime = current_time; continue; } else { /* Although all precautions we did not manage to send the operations Must have been a dropped connection on the transporter side. We don't expect to be able to continue using this connection so we will treat it as a node failure. */ TRACE_DEBUG("Send problem even after checking node status"); }//if } else { /* The send buffer is currently full or at least close to. We will not allow a send to continue. We will set the connection so that it is indicated that we need to abort the transaction. If we were trying to commit or abort and got a send buffer we will not try again and will thus set the state to Aborted to avoid a more or less eternal loop of tries. */ if (a_con->theSendStatus == NdbTransaction::sendOperations) { a_con->setOperationErrorCodeAbort(4021); a_con->theCommitStatus = NdbTransaction::NeedAbort; TRACE_DEBUG("Send buffer full and sendOperations"); } else { a_con->setOperationErrorCodeAbort(4026); a_con->theCommitStatus = NdbTransaction::Aborted; TRACE_DEBUG("Send buffer full, set state to Aborted"); }//if }//if } else {#ifdef VM_TRACE a_con->printState();#endif if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) && tp->get_node_stopping(node_id)) { /* The node we are connected to is currently in an early stopping phase of a graceful stop. We will not send the prepared transactions. We will simply refuse and let the application code handle the abort. */ TRACE_DEBUG("Abort a transaction when stopping a node"); a_con->setOperationErrorCodeAbort(4023); a_con->theCommitStatus = NdbTransaction::NeedAbort; } else { /* The node is hard dead and we cannot continue. We will also release the connection to the free pool. */ TRACE_DEBUG("The node was stone dead, inform about abort"); a_con->setOperationErrorCodeAbort(4025); a_con->theReleaseOnClose = true; a_con->theTransactionIsStarted = false; a_con->theCommitStatus = NdbTransaction::Aborted; }//if }//if a_con->theReturnStatus = NdbTransaction::ReturnFailure; a_con->theCompletionStatus = NdbTransaction::CompletedFailure; a_con->handleExecuteCompletion(); insert_completed_list(a_con); }//for theNoOfPreparedTransactions = 0; if (forceSend == 0) { tp->checkForceSend(theNdbBlockNumber); } else if (forceSend == 1) { tp->forceSend(theNdbBlockNumber); }//if return;}//Ndb::sendPrepTrans()/*****************************************************************************void waitCompletedTransactions(int aMilliSecondsToWait, int noOfEventsToWaitFor);Remark: First send all prepared operations and then check if there are any transactions already completed. Do not wait for not completed transactions.******************************************************************************/void Ndb::waitCompletedTransactions(int aMilliSecondsToWait, int noOfEventsToWaitFor){ theImpl->theWaiter.m_state = NO_WAIT; /** * theImpl->theWaiter.m_state = NO_WAIT; * To ensure no messup with synchronous node fail handling * (see ReportFailure) */ int waitTime = aMilliSecondsToWait; NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + (NDB_TICKS)waitTime; theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; do { if (waitTime < 1000) waitTime = 1000; NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition, (NdbMutex*)theImpl->theWaiter.m_mutex, waitTime); if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) { break; }//if theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; waitTime = (int)(maxTime - NdbTick_CurrentMillisecond()); } while (waitTime > 0); return;}//Ndb::waitCompletedTransactions()/*****************************************************************************void sendPreparedTransactions(int forceSend = 0);Remark: First send all prepared operations and then check if there are any transactions already completed. Do not wait for not completed transactions.******************************************************************************/void Ndb::sendPreparedTransactions(int forceSend){ TransporterFacade::instance()->lock_mutex(); sendPrepTrans(forceSend); TransporterFacade::instance()->unlock_mutex(); return;}//Ndb::sendPreparedTransactions()/*****************************************************************************int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup = 1, int forceSend = 0);Remark: First send all prepared operations and then check if there are any transactions already completed. Wait for not completed transactions until the specified number have completed or until the timeout has occured. Timeout zero means no waiting time.******************************************************************************/int Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend){ NdbTransaction* tConArray[1024]; Uint32 tNoCompletedTransactions; //theCurrentConnectCounter = 0; //theCurrentConnectIndex++; TransporterFacade::instance()->lock_mutex(); sendPrepTrans(forceSend); if ((minNoOfEventsToWakeup <= 0) || ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) { minNoOfEventsToWakeup = theNoOfSentTransactions; }//if if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) && (aMillisecondNumber > 0)) { waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup); tNoCompletedTransactions = pollCompleted(tConArray); } else { tNoCompletedTransactions = pollCompleted(tConArray); }//if TransporterFacade::instance()->unlock_mutex(); reportCallback(tConArray, tNoCompletedTransactions); return tNoCompletedTransactions;}//Ndb::sendPollNdb()/*****************************************************************************int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup);Remark: Check if there are any transactions already completed. Wait for not completed transactions until the specified number have completed or until the timeout has occured. Timeout zero means no waiting time.******************************************************************************/int Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup){ NdbTransaction* tConArray[1024]; Uint32 tNoCompletedTransactions; //theCurrentConnectCounter = 0; //theCurrentConnectIndex++; TransporterFacade::instance()->lock_mutex(); if ((minNoOfEventsToWakeup == 0) || ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) { minNoOfEventsToWakeup = theNoOfSentTransactions; }//if if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) && (aMillisecondNumber > 0)) { waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup); tNoCompletedTransactions = pollCompleted(tConArray); } else { tNoCompletedTransactions = pollCompleted(tConArray); }//if TransporterFacade::instance()->unlock_mutex(); reportCallback(tConArray, tNoCompletedTransactions); return tNoCompletedTransactions;}//Ndb::sendPollNdbWithoutWait()/*****************************************************************************int receiveOptimisedResponse();Return: 0 - Response received -1 - Timeout occured waiting for response -2 - Node failure interupted wait for response******************************************************************************/int Ndb::receiveResponse(int waitTime){ int tResultCode; TransporterFacade::instance()->checkForceSend(theNdbBlockNumber); theImpl->theWaiter.wait(waitTime); if(theImpl->theWaiter.m_state == NO_WAIT) { tResultCode = 0; } else {#ifdef VM_TRACE ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = "; ndbout << theImpl->theWaiter.m_state << endl;#endif if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){ tResultCode = -2; } else { tResultCode = -1; } theImpl->theWaiter.m_state = NO_WAIT; } return tResultCode;}//Ndb::receiveResponse()intNdb::sendRecSignal(Uint16 node_id, Uint32 aWaitState, NdbApiSignal* aSignal, Uint32 conn_seq){ /* In most situations 0 is returned. In error cases we have 5 different cases -1: Send ok, time out in waiting for reply -2: Node has failed -3: Send buffer not full, send failed yet -4: Send buffer full -5: Node is currently stopping */ int return_code; TransporterFacade* tp = TransporterFacade::instance(); Uint32 send_size = 1; // Always sends one signal only tp->lock_mutex(); // Protected area if ((tp->get_node_alive(node_id)) && ((tp->getNodeSequence(node_id) == conn_seq) || (conn_seq == 0))) { if (tp->check_send_size(node_id, send_size)) { return_code = tp->sendSignal(aSignal, node_id); if (return_code != -1) { theImpl->theWaiter.m_node = node_id; theImpl->theWaiter.m_state = aWaitState; return_code = receiveResponse(); } else { return_code = -3; } } else { return_code = -4; }//if } else { if ((tp->get_node_stopping(node_id)) && ((tp->getNodeSequence(node_id) == conn_seq) || (conn_seq == 0))) { return_code = -5; } else { return_code = -2; }//if }//if tp->unlock_mutex(); // End of protected area return return_code;}//Ndb::sendRecSignal()voidNdbTransaction::sendTC_COMMIT_ACK(NdbApiSignal * aSignal, Uint32 transId1, Uint32 transId2, Uint32 aTCRef){#ifdef MARKER_TRACE ndbout_c("Sending TC_COMMIT_ACK(0x%.8x, 0x%.8x) to -> %d", transId1, transId2, refToNode(aTCRef));#endif TransporterFacade *tp = TransporterFacade::instance(); aSignal->theTrace = TestOrd::TraceAPI; aSignal->theReceiversBlockNumber = DBTC; aSignal->theVerId_signalNumber = GSN_TC_COMMIT_ACK; aSignal->theLength = 2; Uint32 * dataPtr = aSignal->getDataPtrSend(); dataPtr[0] = transId1; dataPtr[1] = transId2; tp->sendSignalUnCond(aSignal, refToNode(aTCRef));}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -