⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clustermgr.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <my_pthread.h>#include <ndb_limits.h>#include <ndb_version.h>#include "TransporterFacade.hpp"#include "ClusterMgr.hpp"#include <IPCConfig.hpp>#include "NdbApiSignal.hpp"#include "API.hpp"#include <NdbSleep.h>#include <NdbOut.hpp>#include <NdbTick.h>#include <signaldata/NodeFailRep.hpp>#include <signaldata/NFCompleteRep.hpp>#include <signaldata/ApiRegSignalData.hpp>#include <mgmapi.h>#include <mgmapi_configuration.hpp>#include <mgmapi_config_parameters.h>int global_flag_send_heartbeat_now= 0;// Just a C wrapper for threadMainextern "C" void*runClusterMgr_C(void * me){  ((ClusterMgr*) me)->threadMain();  /**    * Sleep to allow another thread that is not exiting to take control    * of signals allocated by this thread   *   * see Ndb::~Ndb() in Ndbinit.cpp   */  #ifdef NDB_OSE  NdbSleep_MilliSleep(50);#endif  return NULL;}extern "C" {  void ndbSetOwnVersion();}ClusterMgr::ClusterMgr(TransporterFacade & _facade):  theStop(0),  theFacade(_facade){  DBUG_ENTER("ClusterMgr::ClusterMgr");  ndbSetOwnVersion();  clusterMgrThreadMutex = NdbMutex_Create();  noOfAliveNodes= 0;  noOfConnectedNodes= 0;  theClusterMgrThread= 0;  DBUG_VOID_RETURN;}ClusterMgr::~ClusterMgr(){  DBUG_ENTER("ClusterMgr::~ClusterMgr");  doStop();    NdbMutex_Destroy(clusterMgrThreadMutex);  DBUG_VOID_RETURN;}voidClusterMgr::init(ndb_mgm_configuration_iterator & iter){  for(iter.first(); iter.valid(); iter.next()){    Uint32 tmp = 0;    if(iter.get(CFG_NODE_ID, &tmp))      continue;    theNodes[tmp].defined = true;#if 0    ndbout << "--------------------------------------" << endl;    ndbout << "--------------------------------------" << endl;    ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));#endif    unsigned type;    if(iter.get(CFG_TYPE_OF_SECTION, &type))      continue;    switch(type){    case NODE_TYPE_DB:      theNodes[tmp].m_info.m_type = NodeInfo::DB;      break;    case NODE_TYPE_API:      theNodes[tmp].m_info.m_type = NodeInfo::API;      break;    case NODE_TYPE_MGM:      theNodes[tmp].m_info.m_type = NodeInfo::MGM;      break;    case NODE_TYPE_REP:      theNodes[tmp].m_info.m_type = NodeInfo::REP;      break;    case NODE_TYPE_EXT_REP:      theNodes[tmp].m_info.m_type = NodeInfo::REP;      {	Uint32 hbFreq = 10000;	//ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq);	theNodes[tmp].hbFrequency = hbFreq;	assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000);      }      break;    default:      type = type;#if 0      ndbout_c("ClusterMgr: Unknown node type: %d", type);#endif    }  }}voidClusterMgr::startThread() {  NdbMutex_Lock(clusterMgrThreadMutex);    theStop = 0;    theClusterMgrThread = NdbThread_Create(runClusterMgr_C,                                         (void**)this,                                         32768,                                         "ndb_clustermgr",                                         NDB_THREAD_PRIO_LOW);  NdbMutex_Unlock(clusterMgrThreadMutex);}voidClusterMgr::doStop( ){  DBUG_ENTER("ClusterMgr::doStop");  NdbMutex_Lock(clusterMgrThreadMutex);  if(theStop){    NdbMutex_Unlock(clusterMgrThreadMutex);    DBUG_VOID_RETURN;  }  void *status;  theStop = 1;  if (theClusterMgrThread) {    NdbThread_WaitFor(theClusterMgrThread, &status);      NdbThread_Destroy(&theClusterMgrThread);  }  NdbMutex_Unlock(clusterMgrThreadMutex);  DBUG_VOID_RETURN;}voidClusterMgr::threadMain( ){  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));    signal.theVerId_signalNumber   = GSN_API_REGREQ;  signal.theReceiversBlockNumber = QMGR;  signal.theTrace                = 0;  signal.theLength               = ApiRegReq::SignalLength;  ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());  req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());  req->version = NDB_VERSION;    Uint32 timeSlept = 100;  Uint64 now = NdbTick_CurrentMillisecond();  while(!theStop){    /**     * Start of Secure area for use of Transporter     */    int send_heartbeat_now= global_flag_send_heartbeat_now;    global_flag_send_heartbeat_now= 0;    theFacade.lock_mutex();    for (int i = 1; i < MAX_NODES; i++){      /**       * Send register request (heartbeat) to all available nodes        * at specified timing intervals       */      const NodeId nodeId = i;      Node & theNode = theNodes[nodeId];            if (!theNode.defined)	continue;      if (theNode.connected == false){	theFacade.doConnect(nodeId);	continue;      }            if (!theNode.compatible){	continue;      }            theNode.hbCounter += timeSlept;      if (theNode.hbCounter >= theNode.hbFrequency ||	  send_heartbeat_now) {	/**	 * It is now time to send a new Heartbeat	 */	if (theNode.hbCounter >= theNode.hbFrequency) {	  theNode.m_info.m_heartbeat_cnt++;	  theNode.hbCounter = 0;	}	/**	 * If the node is of type REP, 	 * then the receiver of the signal should be API_CLUSTERMGR	 */	if (theNode.m_info.m_type == NodeInfo::REP) {	  signal.theReceiversBlockNumber = API_CLUSTERMGR;	}#if 0 	ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);#endif	theFacade.sendSignalUnCond(&signal, nodeId);      }//if            if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){	reportNodeFailed(i);      }//if    }        /**     * End of secure area. Let other threads in     */    theFacade.unlock_mutex();        // Sleep for 100 ms between each Registration Heartbeat    Uint64 before = now;    NdbSleep_MilliSleep(100);     now = NdbTick_CurrentMillisecond();    timeSlept = (now - before);  }}#if 0voidClusterMgr::showState(NodeId nodeId){  ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl;  ndbout << "theNodeList      = " << theNodeList[nodeId] << endl;  ndbout << "theNodeState     = " << theNodeState[nodeId] << endl;  ndbout << "theNodeCount     = " << theNodeCount[nodeId] << endl;  ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl;  ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;}#endifClusterMgr::Node::Node()  : m_state(NodeState::SL_NOTHING) {   compatible = nfCompleteRep = true;  connected = defined = m_alive = false;   m_state.m_connected_nodes.clear();}/****************************************************************************** * API_REGREQ and friends ******************************************************************************/voidClusterMgr::execAPI_REGREQ(const Uint32 * theData){  const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];  const NodeId nodeId = refToNode(apiRegReq->ref);#if 0  ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);#endif  assert(nodeId > 0 && nodeId < MAX_NODES);  Node & node = theNodes[nodeId];  assert(node.defined == true);  assert(node.connected == true);  if(node.m_info.m_version != apiRegReq->version){    node.m_info.m_version = apiRegReq->version;    if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||	getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {      node.compatible = false;    } else {      node.compatible = true;    }  }  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));  signal.theVerId_signalNumber   = GSN_API_REGCONF;  signal.theReceiversBlockNumber = API_CLUSTERMGR;  signal.theTrace                = 0;  signal.theLength               = ApiRegConf::SignalLength;    ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());  conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());  conf->version = NDB_VERSION;  conf->apiHeartbeatFrequency = node.hbFrequency;  theFacade.sendSignalUnCond(&signal, nodeId);}int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;voidClusterMgr::execAPI_REGCONF(const Uint32 * theData){  const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];  const NodeId nodeId = refToNode(apiRegConf->qmgrRef);  #if 0   ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);#endif  assert(nodeId > 0 && nodeId < MAX_NODES);    Node & node = theNodes[nodeId];  assert(node.defined == true);  assert(node.connected == true);  if(node.m_info.m_version != apiRegConf->version){    node.m_info.m_version = apiRegConf->version;    if (global_mgmt_server_check == 1)      node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,					       node.m_info.m_version);    else      node.compatible = ndbCompatible_api_ndb(NDB_VERSION,					      node.m_info.m_version);  }  node.m_state = apiRegConf->nodeState;  if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED  ||			  node.m_state.startLevel == NodeState::SL_SINGLEUSER)){    set_node_alive(node, true);  } else {    set_node_alive(node, false);  }//if  node.m_info.m_heartbeat_cnt = 0;  node.hbCounter = 0;  if (node.m_info.m_type != NodeInfo::REP) {    node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;  }}voidClusterMgr::execAPI_REGREF(const Uint32 * theData){    ApiRegRef * ref = (ApiRegRef*)theData;    const NodeId nodeId = refToNode(ref->ref);    assert(nodeId > 0 && nodeId < MAX_NODES);    Node & node = theNodes[nodeId];  assert(node.connected == true);  assert(node.defined == true);  node.compatible = false;  set_node_alive(node, false);  node.m_state = NodeState::SL_NOTHING;  node.m_info.m_version = ref->version;  switch(ref->errorCode){  case ApiRegRef::WrongType:    ndbout_c("Node %d reports that this node should be a NDB node", nodeId);    abort();  case ApiRegRef::UnsupportedVersion:  default:    break;  }}voidClusterMgr::execNODE_FAILREP(const Uint32 * theData){  NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0];  for(int i = 1; i<MAX_NODES; i++){    if(NodeBitmask::get(nodeFail->theNodes, i)){      reportNodeFailed(i);    }  }}voidClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){  NFCompleteRep * const nfComp = (NFCompleteRep *)theData;  const NodeId nodeId = nfComp->failedNodeId;  assert(nodeId > 0 && nodeId < MAX_NODES);    theFacade.ReportNodeFailureComplete(nodeId);  theNodes[nodeId].nfCompleteRep = true;}voidClusterMgr::reportConnected(NodeId nodeId){  /**   * Ensure that we are sending heartbeat every 100 ms   * until we have got the first reply from NDB providing   * us with the real time-out period to use.   */  assert(nodeId > 0 && nodeId < MAX_NODES);  noOfConnectedNodes++;  Node & theNode = theNodes[nodeId];  theNode.connected = true;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -