⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndb.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA *//*****************************************************************************Name:          Ndb.cpp******************************************************************************/#include <ndb_global.h>#include "NdbApiSignal.hpp"#include "NdbImpl.hpp"#include <NdbOperation.hpp>#include <NdbTransaction.hpp>#include <NdbEventOperation.hpp>#include <NdbRecAttr.hpp>#include <md5_hash.hpp>#include <NdbSleep.h>#include <NdbOut.hpp>#include <ndb_limits.h>#include "API.hpp"#include <NdbEnv.h>#include <BaseString.hpp>/****************************************************************************void connect();Connect to any node which has no connection at the moment.****************************************************************************/NdbTransaction* Ndb::doConnect(Uint32 tConNode) {  Uint32        tNode;  Uint32        tAnyAlive = 0;  int TretCode= 0;  DBUG_ENTER("Ndb::doConnect");  if (tConNode != 0) {    TretCode = NDB_connect(tConNode);    if ((TretCode == 1) || (TretCode == 2)) {//****************************************************************************// We have connections now to the desired node. Return//****************************************************************************      DBUG_RETURN(getConnectedNdbTransaction(tConNode));    } else if (TretCode != 0) {      tAnyAlive = 1;    }//if  }//if//****************************************************************************// We will connect to any node. Make sure that we have connections to all// nodes.//****************************************************************************  if (theImpl->m_optimized_node_selection)  {    Ndb_cluster_connection_node_iter &node_iter=       theImpl->m_node_iter;    theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);    while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))    {      TretCode= NDB_connect(tNode);      if ((TretCode == 1) ||	  (TretCode == 2))      {//****************************************************************************// We have connections now to the desired node. Return//****************************************************************************	DBUG_RETURN(getConnectedNdbTransaction(tNode));      } else if (TretCode != 0) {	tAnyAlive= 1;      }//if      DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s",			 tNode, TretCode, getNdbError().code,			 getNdbError().message));    }  }  else // just do a regular round robin  {    Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;    Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;    UintR Tcount = 0;    do {      theCurrentConnectIndex++;      if (theCurrentConnectIndex >= tNoOfDbNodes)	theCurrentConnectIndex = 0;      Tcount++;      tNode= theImpl->theDBnodes[theCurrentConnectIndex];      TretCode= NDB_connect(tNode);      if ((TretCode == 1) ||	  (TretCode == 2))      {//****************************************************************************// We have connections now to the desired node. Return//****************************************************************************	DBUG_RETURN(getConnectedNdbTransaction(tNode));      } else if (TretCode != 0) {	tAnyAlive= 1;      }//if      DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode));    } while (Tcount < tNoOfDbNodes);  }//****************************************************************************// We were unable to find a free connection. If no node alive we will report// error code for cluster failure otherwise connection failure.//****************************************************************************  if (tAnyAlive == 1) {#ifdef VM_TRACE    ndbout << "TretCode = " << TretCode << endl;#endif    theError.code = 4006;  } else {    theError.code = 4009;  }//if  DBUG_RETURN(NULL);}int Ndb::NDB_connect(Uint32 tNode) {//****************************************************************************// We will perform seize of a transaction record in DBTC in the specified node.//***************************************************************************    int	         tReturnCode;  TransporterFacade *tp = TransporterFacade::instance();  DBUG_ENTER("Ndb::NDB_connect");  bool nodeAvail = tp->get_node_alive(tNode);  if(nodeAvail == false){    DBUG_RETURN(0);  }    NdbTransaction * tConArray = theConnectionArray[tNode];  if (tConArray != NULL) {    DBUG_RETURN(2);  }    NdbTransaction * tNdbCon = getNdbCon();	// Get free connection object.  if (tNdbCon == NULL) {    DBUG_RETURN(4);  }//if  NdbApiSignal*	tSignal = getSignal();		// Get signal object  if (tSignal == NULL) {    releaseNdbCon(tNdbCon);    DBUG_RETURN(4);  }//if  if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) {    releaseNdbCon(tNdbCon);    releaseSignal(tSignal);    DBUG_RETURN(4);  }//if  tSignal->setData(tNdbCon->ptr2int(), 1);//************************************************// Set connection pointer as NdbTransaction object//************************************************  tSignal->setData(theMyRef, 2);	// Set my block reference  tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting  Uint32 nodeSequence;  { // send and receive signal    Guard guard(tp->theMutexPtr);    nodeSequence = tp->getNodeSequence(tNode);    bool node_is_alive = tp->get_node_alive(tNode);    if (node_is_alive) {       tReturnCode = tp->sendSignal(tSignal, tNode);        releaseSignal(tSignal);       if (tReturnCode != -1) {        theImpl->theWaiter.m_node = tNode;          theImpl->theWaiter.m_state = WAIT_TC_SEIZE;          tReturnCode = receiveResponse();       }//if    } else {      releaseSignal(tSignal);      tReturnCode = -1;    }//if  }  if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {    //************************************************    // Send and receive was successful    //************************************************    NdbTransaction* tPrevFirst = theConnectionArray[tNode];    tNdbCon->setConnectedNodeId(tNode, nodeSequence);        tNdbCon->setMyBlockReference(theMyRef);    theConnectionArray[tNode] = tNdbCon;    tNdbCon->theNext = tPrevFirst;    DBUG_RETURN(1);  } else {    releaseNdbCon(tNdbCon);//****************************************************************************// Unsuccessful connect is indicated by 3.//****************************************************************************    DBUG_PRINT("info",	       ("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d",		tReturnCode, tNdbCon->Status()));    DBUG_RETURN(3);  }//if}//Ndb::NDB_connect()NdbTransaction *Ndb::getConnectedNdbTransaction(Uint32 nodeId){  NdbTransaction* next = theConnectionArray[nodeId];  theConnectionArray[nodeId] = next->theNext;  next->theNext = NULL;  return next;}//Ndb::getConnectedNdbTransaction()/*****************************************************************************disconnect();Remark:        Disconnect all connections to the database. *****************************************************************************/void Ndb::doDisconnect(){  NdbTransaction* tNdbCon;  CHECK_STATUS_MACRO_VOID;  /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */  DBUG_ENTER("Ndb::doDisconnect");  Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;  Uint8 *theDBnodes= theImpl->theDBnodes;  DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes));  UintR i;  for (i = 0; i < tNoOfDbNodes; i++) {    Uint32 tNode = theDBnodes[i];    tNdbCon = theConnectionArray[tNode];    while (tNdbCon != NULL) {      NdbTransaction* tmpNdbCon = tNdbCon;      tNdbCon = tNdbCon->theNext;      releaseConnectToNdb(tmpNdbCon);    }//while  }//for  tNdbCon = theTransactionList;  while (tNdbCon != NULL) {    NdbTransaction* tmpNdbCon = tNdbCon;    tNdbCon = tNdbCon->theNext;    releaseConnectToNdb(tmpNdbCon);  }//while  DBUG_VOID_RETURN;}//Ndb::disconnect()/*****************************************************************************int waitUntilReady(int timeout);Return Value:   Returns 0 if the Ndb is ready within timeout seconds.                Returns -1 otherwise.Remark:         Waits until a node has status != 0*****************************************************************************/ intNdb::waitUntilReady(int timeout){  DBUG_ENTER("Ndb::waitUntilReady");  int secondsCounter = 0;  int milliCounter = 0;  int noChecksSinceFirstAliveFound = 0;  int id;  if (theInitState != Initialised) {    // Ndb::init is not called    theError.code = 4256;    DBUG_RETURN(-1);  }  while (theNode == 0) {    if (secondsCounter >= timeout)    {      theError.code = 4269;      DBUG_RETURN(-1);    }    NdbSleep_MilliSleep(100);    milliCounter += 100;    if (milliCounter >= 1000) {      secondsCounter++;      milliCounter = 0;    }//if  }  if (theImpl->m_ndb_cluster_connection.wait_until_ready      (timeout-secondsCounter,30) < 0)  {    theError.code = 4009;    DBUG_RETURN(-1);  }  DBUG_RETURN(0);}/*****************************************************************************NdbTransaction* startTransaction();Return Value:   Returns a pointer to a connection object.                Return NULL otherwise.Remark:         Start transaction. Synchronous.*****************************************************************************/ NdbTransaction* Ndb::startTransaction(const NdbDictionary::Table *table,		      const char * keyData, Uint32 keyLen){  DBUG_ENTER("Ndb::startTransaction");  if (theInitState == Initialised) {    theError.code = 0;    checkFailedNode();    /**     * If the user supplied key data     * We will make a qualified quess to which node is the primary for the     * the fragment and contact that node     */    Uint32 nodeId;    NdbTableImpl* impl;    if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table)))     {      Uint32 hashValue;      {	Uint32 buf[4];	if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0)	{	  md5_hash(buf, (const Uint64*)keyData, keyLen >> 2);	}	else	{	  Uint64 tmp[1000];	  tmp[keyLen/8] = 0;	  memcpy(tmp, keyData, keyLen);	  md5_hash(buf, tmp, (keyLen+3) >> 2);	  	}	hashValue= buf[1];      }      const Uint16 *nodes;      Uint32 cnt= impl->get_nodes(hashValue, &nodes);      if(cnt)	nodeId= nodes[0];      else	nodeId= 0;    } else {      nodeId = 0;    }//if    {      NdbTransaction *trans= startTransactionLocal(0, nodeId);      DBUG_PRINT("exit",("start trans: 0x%x transid: 0x%llx",			 trans, trans ? trans->getTransactionId() : 0));      DBUG_RETURN(trans);    }  } else {    DBUG_RETURN(NULL);  }//if}//Ndb::startTransaction()/*****************************************************************************NdbTransaction* hupp(NdbTransaction* pBuddyTrans);Return Value:   Returns a pointer to a connection object.                Connected to the same node as pBuddyTrans                and also using the same transction idRemark:         Start transaction. Synchronous.*****************************************************************************/ NdbTransaction* Ndb::hupp(NdbTransaction* pBuddyTrans){  DBUG_ENTER("Ndb::hupp");  DBUG_PRINT("enter", ("trans: 0x%x",pBuddyTrans));  Uint32 aPriority = 0;  if (pBuddyTrans == NULL){    DBUG_RETURN(startTransaction());  }  if (theInitState == Initialised) {    theError.code = 0;    checkFailedNode();    Uint32 nodeId = pBuddyTrans->getConnectedNodeId();    NdbTransaction* pCon = startTransactionLocal(aPriority, nodeId);    if(pCon == NULL)      DBUG_RETURN(NULL);    if (pCon->getConnectedNodeId() != nodeId){      // We could not get a connection to the desired node      // release the connection and return NULL      closeTransaction(pCon);      theError.code = 4006;      DBUG_RETURN(NULL);    }    pCon->setTransactionId(pBuddyTrans->getTransactionId());    pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr());    DBUG_PRINT("exit", ("hupp trans: 0x%x transid: 0x%llx",			pCon, pCon ? pCon->getTransactionId() : 0));    DBUG_RETURN(pCon);  } else {    DBUG_RETURN(NULL);  }//if}//Ndb::hupp()NdbTransaction* Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId){#ifdef VM_TRACE  char buf[255];  const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);  if(val != 0){    nodeId = atoi(val);  }#endif  DBUG_ENTER("Ndb::startTransactionLocal");  DBUG_PRINT("enter", ("nodeid: %d", nodeId));  if(unlikely(theRemainingStartTransactions == 0))  {    theError.code = 4006;    DBUG_RETURN(0);  }    NdbTransaction* tConnection;  Uint64 tFirstTransId = theFirstTransId;  tConnection = doConnect(nodeId);  if (tConnection == NULL) {    DBUG_RETURN(NULL);  }//if  theRemainingStartTransactions--;  NdbTransaction* tConNext = theTransactionList;  tConnection->init();  theTransactionList = tConnection;        // into a transaction list.  tConnection->next(tConNext);   // Add the active connection object  tConnection->setTransactionId(tFirstTransId);  tConnection->thePriority = aPriority;  if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {    //---------------------------------------------------// Transaction id rolling round. We will start from// consecutive identity 0 again.//---------------------------------------------------

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -