clustermgr.cpp
来自「MySQL源码文件5.X系列, 可自已编译到服务器」· C++ 代码 · 共 915 行 · 第 1/2 页
CPP
915 行
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_pthread.h>#include <ndb_limits.h>#include <ndb_version.h>#include "TransporterFacade.hpp"#include "ClusterMgr.hpp"#include <IPCConfig.hpp>#include "NdbApiSignal.hpp"#include "API.hpp"#include <NdbSleep.h>#include <NdbOut.hpp>#include <NdbTick.h>#include <signaldata/NodeFailRep.hpp>#include <signaldata/NFCompleteRep.hpp>#include <signaldata/ApiRegSignalData.hpp>#include <mgmapi.h>#include <mgmapi_configuration.hpp>#include <mgmapi_config_parameters.h>//#define DEBUG_REG// Just a C wrapper for threadMainextern "C" void*runClusterMgr_C(void * me){ ((ClusterMgr*) me)->threadMain(); /** * Sleep to allow another thread that is not exiting to take control * of signals allocated by this thread * * see Ndb::~Ndb() in Ndbinit.cpp */ #ifdef NDB_OSE NdbSleep_MilliSleep(50);#endif return NULL;}extern "C" { void ndbSetOwnVersion();}ClusterMgr::ClusterMgr(TransporterFacade & _facade): theStop(0), theFacade(_facade){ DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); waitForHBCond= NdbCondition_Create(); waitingForHB= false; noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; m_connect_count = 0; DBUG_VOID_RETURN;}ClusterMgr::~ClusterMgr(){ DBUG_ENTER("ClusterMgr::~ClusterMgr"); doStop(); NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN;}voidClusterMgr::init(ndb_mgm_configuration_iterator & iter){ for(iter.first(); iter.valid(); iter.next()){ Uint32 tmp = 0; if(iter.get(CFG_NODE_ID, &tmp)) continue; theNodes[tmp].defined = true;#if 0 ndbout << "--------------------------------------" << endl; ndbout << "--------------------------------------" << endl; ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));#endif unsigned type; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; switch(type){ case NODE_TYPE_DB: theNodes[tmp].m_info.m_type = NodeInfo::DB; break; case NODE_TYPE_API: theNodes[tmp].m_info.m_type = NodeInfo::API; break; case NODE_TYPE_MGM: theNodes[tmp].m_info.m_type = NodeInfo::MGM; break; case NODE_TYPE_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; break; case NODE_TYPE_EXT_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; { Uint32 hbFreq = 10000; //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq); theNodes[tmp].hbFrequency = hbFreq; assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000); } break; default: type = type;#if 0 ndbout_c("ClusterMgr: Unknown node type: %d", type);#endif } }}voidClusterMgr::startThread() { NdbMutex_Lock(clusterMgrThreadMutex); theStop = 0; theClusterMgrThread = NdbThread_Create(runClusterMgr_C, (void**)this, 32768, "ndb_clustermgr", NDB_THREAD_PRIO_LOW); NdbMutex_Unlock(clusterMgrThreadMutex);}voidClusterMgr::doStop( ){ DBUG_ENTER("ClusterMgr::doStop"); NdbMutex_Lock(clusterMgrThreadMutex); if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void *status; theStop = 1; if (theClusterMgrThread) { NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_Destroy(&theClusterMgrThread); } NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN;}voidClusterMgr::forceHB(){ theFacade.lock_mutex(); if(waitingForHB) { NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); theFacade.unlock_mutex(); return; } waitingForHB= true; NodeBitmask ndb_nodes; ndb_nodes.clear(); waitForHBFromNodes.clear(); for(Uint32 i = 0; i < MAX_NODES; i++) { if(!theNodes[i].defined) continue; if(theNodes[i].m_info.m_type == NodeInfo::DB) { ndb_nodes.set(i); const ClusterMgr::Node &node= getNodeInfo(i); waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes); } } waitForHBFromNodes.bitAND(ndb_nodes);#ifdef DEBUG_REG char buf[128]; ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;#endif NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; int nodeId= 0; for(int i=0; (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i)); i= nodeId+1) {#ifdef DEBUG_REG ndbout << "FORCE HB to " << nodeId << endl;#endif theFacade.sendSignalUnCond(&signal, nodeId); } NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); waitingForHB= false;#ifdef DEBUG_REG ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;#endif theFacade.unlock_mutex();}voidClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; Uint32 timeSlept = 100; Uint64 now = NdbTick_CurrentMillisecond(); while(!theStop){ /** * Start of Secure area for use of Transporter */ theFacade.lock_mutex(); for (int i = 1; i < MAX_NODES; i++){ /** * Send register request (heartbeat) to all available nodes * at specified timing intervals */ const NodeId nodeId = i; Node & theNode = theNodes[nodeId]; if (!theNode.defined) continue; if (theNode.connected == false){ theFacade.doConnect(nodeId); continue; } if (!theNode.compatible){ continue; } theNode.hbCounter += timeSlept; if (theNode.hbCounter >= theNode.hbFrequency) { /** * It is now time to send a new Heartbeat */ if (theNode.hbCounter >= theNode.hbFrequency) { theNode.m_info.m_heartbeat_cnt++; theNode.hbCounter = 0; } /** * If the node is of type REP, * then the receiver of the signal should be API_CLUSTERMGR */ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; }#ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);#endif theFacade.sendSignalUnCond(&signal, nodeId); }//if if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){ reportNodeFailed(i); }//if } /** * End of secure area. Let other threads in */ theFacade.unlock_mutex(); // Sleep for 100 ms between each Registration Heartbeat Uint64 before = now; NdbSleep_MilliSleep(100); now = NdbTick_CurrentMillisecond(); timeSlept = (now - before); }}#if 0voidClusterMgr::showState(NodeId nodeId){ ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl; ndbout << "theNodeList = " << theNodeList[nodeId] << endl; ndbout << "theNodeState = " << theNodeState[nodeId] << endl; ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl; ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl; ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;}#endifClusterMgr::Node::Node() : m_state(NodeState::SL_NOTHING) { compatible = nfCompleteRep = true; connected = defined = m_alive = false; m_state.m_connected_nodes.clear();}/****************************************************************************** * API_REGREQ and friends ******************************************************************************/voidClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref);#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);#endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegReq->version){ node.m_info.m_version = apiRegReq->version; if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) || getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) { node.compatible = false; } else { node.compatible = true; } } NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGCONF; signal.theReceiversBlockNumber = API_CLUSTERMGR; signal.theTrace = 0; signal.theLength = ApiRegConf::SignalLength; ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend()); conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId()); conf->version = NDB_VERSION; conf->apiHeartbeatFrequency = node.hbFrequency; theFacade.sendSignalUnCond(&signal, nodeId);}int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;voidClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);#endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegConf->version){ node.m_info.m_version = apiRegConf->version; if (global_mgmt_server_check == 1) node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, node.m_info.m_version); else node.compatible = ndbCompatible_api_ndb(NDB_VERSION, node.m_info.m_version); } node.m_state = apiRegConf->nodeState; if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED || node.m_state.startLevel == NodeState::SL_SINGLEUSER)){ set_node_alive(node, true); } else { set_node_alive(node, false); }//if node.m_info.m_heartbeat_cnt = 0; node.hbCounter = 0; if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } if(waitingForHB) { waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) { waitingForHB= false; NdbCondition_Broadcast(waitForHBCond); } }}voidClusterMgr::execAPI_REGREF(const Uint32 * theData){ ApiRegRef * ref = (ApiRegRef*)theData; const NodeId nodeId = refToNode(ref->ref); assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.connected == true); assert(node.defined == true); node.compatible = false; set_node_alive(node, false); node.m_state = NodeState::SL_NOTHING; node.m_info.m_version = ref->version; switch(ref->errorCode){ case ApiRegRef::WrongType: ndbout_c("Node %d reports that this node should be a NDB node", nodeId); abort(); case ApiRegRef::UnsupportedVersion: default: break; } waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) NdbCondition_Signal(waitForHBCond);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?