📄 ndb.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//*****************************************************************************Name: Ndb.cpp******************************************************************************/#include <ndb_global.h>#include "NdbApiSignal.hpp"#include "NdbImpl.hpp"#include <NdbOperation.hpp>#include <NdbTransaction.hpp>#include <NdbEventOperation.hpp>#include <NdbRecAttr.hpp>#include <md5_hash.hpp>#include <NdbSleep.h>#include <NdbOut.hpp>#include <ndb_limits.h>#include "API.hpp"#include <NdbEnv.h>#include <BaseString.hpp>/****************************************************************************void connect();Connect to any node which has no connection at the moment.****************************************************************************/NdbTransaction* Ndb::doConnect(Uint32 tConNode) { Uint32 tNode; Uint32 tAnyAlive = 0; int TretCode= 0; DBUG_ENTER("Ndb::doConnect"); if (tConNode != 0) { TretCode = NDB_connect(tConNode); if ((TretCode == 1) || (TretCode == 2)) {//****************************************************************************// We have connections now to the desired node. Return//**************************************************************************** DBUG_RETURN(getConnectedNdbTransaction(tConNode)); } else if (TretCode != 0) { tAnyAlive = 1; }//if }//if//****************************************************************************// We will connect to any node. Make sure that we have connections to all// nodes.//**************************************************************************** if (theImpl->m_optimized_node_selection) { Ndb_cluster_connection_node_iter &node_iter= theImpl->m_node_iter; theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter); while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter))) { TretCode= NDB_connect(tNode); if ((TretCode == 1) || (TretCode == 2)) {//****************************************************************************// We have connections now to the desired node. Return//**************************************************************************** DBUG_RETURN(getConnectedNdbTransaction(tNode)); } else if (TretCode != 0) { tAnyAlive= 1; }//if DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s", tNode, TretCode, getNdbError().code, getNdbError().message)); } } else // just do a regular round robin { Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes; Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex; UintR Tcount = 0; do { theCurrentConnectIndex++; if (theCurrentConnectIndex >= tNoOfDbNodes) theCurrentConnectIndex = 0; Tcount++; tNode= theImpl->theDBnodes[theCurrentConnectIndex]; TretCode= NDB_connect(tNode); if ((TretCode == 1) || (TretCode == 2)) {//****************************************************************************// We have connections now to the desired node. Return//**************************************************************************** DBUG_RETURN(getConnectedNdbTransaction(tNode)); } else if (TretCode != 0) { tAnyAlive= 1; }//if DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode)); } while (Tcount < tNoOfDbNodes); }//****************************************************************************// We were unable to find a free connection. If no node alive we will report// error code for cluster failure otherwise connection failure.//**************************************************************************** if (tAnyAlive == 1) {#ifdef VM_TRACE ndbout << "TretCode = " << TretCode << endl;#endif theError.code = 4006; } else { theError.code = 4009; }//if DBUG_RETURN(NULL);}int Ndb::NDB_connect(Uint32 tNode) {//****************************************************************************// We will perform seize of a transaction record in DBTC in the specified node.//*************************************************************************** int tReturnCode; TransporterFacade *tp = TransporterFacade::instance(); DBUG_ENTER("Ndb::NDB_connect"); bool nodeAvail = tp->get_node_alive(tNode); if(nodeAvail == false){ DBUG_RETURN(0); } NdbTransaction * tConArray = theConnectionArray[tNode]; if (tConArray != NULL) { DBUG_RETURN(2); } NdbTransaction * tNdbCon = getNdbCon(); // Get free connection object. if (tNdbCon == NULL) { DBUG_RETURN(4); }//if NdbApiSignal* tSignal = getSignal(); // Get signal object if (tSignal == NULL) { releaseNdbCon(tNdbCon); DBUG_RETURN(4); }//if if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) { releaseNdbCon(tNdbCon); releaseSignal(tSignal); DBUG_RETURN(4); }//if tSignal->setData(tNdbCon->ptr2int(), 1);//************************************************// Set connection pointer as NdbTransaction object//************************************************ tSignal->setData(theMyRef, 2); // Set my block reference tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting Uint32 nodeSequence; { // send and receive signal Guard guard(tp->theMutexPtr); nodeSequence = tp->getNodeSequence(tNode); bool node_is_alive = tp->get_node_alive(tNode); if (node_is_alive) { tReturnCode = tp->sendSignal(tSignal, tNode); releaseSignal(tSignal); if (tReturnCode != -1) { theImpl->theWaiter.m_node = tNode; theImpl->theWaiter.m_state = WAIT_TC_SEIZE; tReturnCode = receiveResponse(); }//if } else { releaseSignal(tSignal); tReturnCode = -1; }//if } if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) { //************************************************ // Send and receive was successful //************************************************ NdbTransaction* tPrevFirst = theConnectionArray[tNode]; tNdbCon->setConnectedNodeId(tNode, nodeSequence); tNdbCon->setMyBlockReference(theMyRef); theConnectionArray[tNode] = tNdbCon; tNdbCon->theNext = tPrevFirst; DBUG_RETURN(1); } else { releaseNdbCon(tNdbCon);//****************************************************************************// Unsuccessful connect is indicated by 3.//**************************************************************************** DBUG_PRINT("info", ("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d", tReturnCode, tNdbCon->Status())); DBUG_RETURN(3); }//if}//Ndb::NDB_connect()NdbTransaction *Ndb::getConnectedNdbTransaction(Uint32 nodeId){ NdbTransaction* next = theConnectionArray[nodeId]; theConnectionArray[nodeId] = next->theNext; next->theNext = NULL; return next;}//Ndb::getConnectedNdbTransaction()/*****************************************************************************disconnect();Remark: Disconnect all connections to the database. *****************************************************************************/void Ndb::doDisconnect(){ NdbTransaction* tNdbCon; CHECK_STATUS_MACRO_VOID; /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */ DBUG_ENTER("Ndb::doDisconnect"); Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes; Uint8 *theDBnodes= theImpl->theDBnodes; DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes)); UintR i; for (i = 0; i < tNoOfDbNodes; i++) { Uint32 tNode = theDBnodes[i]; tNdbCon = theConnectionArray[tNode]; while (tNdbCon != NULL) { NdbTransaction* tmpNdbCon = tNdbCon; tNdbCon = tNdbCon->theNext; releaseConnectToNdb(tmpNdbCon); }//while }//for tNdbCon = theTransactionList; while (tNdbCon != NULL) { NdbTransaction* tmpNdbCon = tNdbCon; tNdbCon = tNdbCon->theNext; releaseConnectToNdb(tmpNdbCon); }//while DBUG_VOID_RETURN;}//Ndb::disconnect()/*****************************************************************************int waitUntilReady(int timeout);Return Value: Returns 0 if the Ndb is ready within timeout seconds. Returns -1 otherwise.Remark: Waits until a node has status != 0*****************************************************************************/ intNdb::waitUntilReady(int timeout){ DBUG_ENTER("Ndb::waitUntilReady"); int secondsCounter = 0; int milliCounter = 0; int noChecksSinceFirstAliveFound = 0; int id; if (theInitState != Initialised) { // Ndb::init is not called theError.code = 4256; DBUG_RETURN(-1); } while (theNode == 0) { if (secondsCounter >= timeout) { theError.code = 4269; DBUG_RETURN(-1); } NdbSleep_MilliSleep(100); milliCounter += 100; if (milliCounter >= 1000) { secondsCounter++; milliCounter = 0; }//if } if (theImpl->m_ndb_cluster_connection.wait_until_ready (timeout-secondsCounter,30) < 0) { theError.code = 4009; DBUG_RETURN(-1); } DBUG_RETURN(0);}/*****************************************************************************NdbTransaction* startTransaction();Return Value: Returns a pointer to a connection object. Return NULL otherwise.Remark: Start transaction. Synchronous.*****************************************************************************/ NdbTransaction* Ndb::startTransaction(const NdbDictionary::Table *table, const char * keyData, Uint32 keyLen){ DBUG_ENTER("Ndb::startTransaction"); if (theInitState == Initialised) { theError.code = 0; checkFailedNode(); /** * If the user supplied key data * We will make a qualified quess to which node is the primary for the * the fragment and contact that node */ Uint32 nodeId; NdbTableImpl* impl; if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table))) { Uint32 hashValue; { Uint32 buf[4]; if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0) { md5_hash(buf, (const Uint64*)keyData, keyLen >> 2); } else { Uint64 tmp[1000]; tmp[keyLen/8] = 0; memcpy(tmp, keyData, keyLen); md5_hash(buf, tmp, (keyLen+3) >> 2); } hashValue= buf[1]; } const Uint16 *nodes; Uint32 cnt= impl->get_nodes(hashValue, &nodes); if(cnt) nodeId= nodes[0]; else nodeId= 0; } else { nodeId = 0; }//if { NdbTransaction *trans= startTransactionLocal(0, nodeId); DBUG_PRINT("exit",("start trans: 0x%x transid: 0x%llx", trans, trans ? trans->getTransactionId() : 0)); DBUG_RETURN(trans); } } else { DBUG_RETURN(NULL); }//if}//Ndb::startTransaction()/*****************************************************************************NdbTransaction* hupp(NdbTransaction* pBuddyTrans);Return Value: Returns a pointer to a connection object. Connected to the same node as pBuddyTrans and also using the same transction idRemark: Start transaction. Synchronous.*****************************************************************************/ NdbTransaction* Ndb::hupp(NdbTransaction* pBuddyTrans){ DBUG_ENTER("Ndb::hupp"); DBUG_PRINT("enter", ("trans: 0x%x",pBuddyTrans)); Uint32 aPriority = 0; if (pBuddyTrans == NULL){ DBUG_RETURN(startTransaction()); } if (theInitState == Initialised) { theError.code = 0; checkFailedNode(); Uint32 nodeId = pBuddyTrans->getConnectedNodeId(); NdbTransaction* pCon = startTransactionLocal(aPriority, nodeId); if(pCon == NULL) DBUG_RETURN(NULL); if (pCon->getConnectedNodeId() != nodeId){ // We could not get a connection to the desired node // release the connection and return NULL closeTransaction(pCon); theError.code = 4006; DBUG_RETURN(NULL); } pCon->setTransactionId(pBuddyTrans->getTransactionId()); pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr()); DBUG_PRINT("exit", ("hupp trans: 0x%x transid: 0x%llx", pCon, pCon ? pCon->getTransactionId() : 0)); DBUG_RETURN(pCon); } else { DBUG_RETURN(NULL); }//if}//Ndb::hupp()NdbTransaction* Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId){#ifdef VM_TRACE char buf[255]; const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255); if(val != 0){ nodeId = atoi(val); }#endif DBUG_ENTER("Ndb::startTransactionLocal"); DBUG_PRINT("enter", ("nodeid: %d", nodeId)); if(unlikely(theRemainingStartTransactions == 0)) { theError.code = 4006; DBUG_RETURN(0); } NdbTransaction* tConnection; Uint64 tFirstTransId = theFirstTransId; tConnection = doConnect(nodeId); if (tConnection == NULL) { DBUG_RETURN(NULL); }//if theRemainingStartTransactions--; NdbTransaction* tConNext = theTransactionList; tConnection->init(); theTransactionList = tConnection; // into a transaction list. tConnection->next(tConNext); // Add the active connection object tConnection->setTransactionId(tFirstTransId); tConnection->thePriority = aPriority; if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) { //---------------------------------------------------// Transaction id rolling round. We will start from// consecutive identity 0 again.//---------------------------------------------------
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -