📄 clustermgr.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_pthread.h>#include <ndb_limits.h>#include <ndb_version.h>#include "TransporterFacade.hpp"#include "ClusterMgr.hpp"#include <IPCConfig.hpp>#include "NdbApiSignal.hpp"#include "API.hpp"#include <NdbSleep.h>#include <NdbOut.hpp>#include <NdbTick.h>#include <signaldata/NodeFailRep.hpp>#include <signaldata/NFCompleteRep.hpp>#include <signaldata/ApiRegSignalData.hpp>#include <mgmapi.h>#include <mgmapi_configuration.hpp>#include <mgmapi_config_parameters.h>int global_flag_send_heartbeat_now= 0;// Just a C wrapper for threadMainextern "C" void*runClusterMgr_C(void * me){ ((ClusterMgr*) me)->threadMain(); /** * Sleep to allow another thread that is not exiting to take control * of signals allocated by this thread * * see Ndb::~Ndb() in Ndbinit.cpp */ #ifdef NDB_OSE NdbSleep_MilliSleep(50);#endif return NULL;}extern "C" { void ndbSetOwnVersion();}ClusterMgr::ClusterMgr(TransporterFacade & _facade): theStop(0), theFacade(_facade){ DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; DBUG_VOID_RETURN;}ClusterMgr::~ClusterMgr(){ DBUG_ENTER("ClusterMgr::~ClusterMgr"); doStop(); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN;}voidClusterMgr::init(ndb_mgm_configuration_iterator & iter){ for(iter.first(); iter.valid(); iter.next()){ Uint32 tmp = 0; if(iter.get(CFG_NODE_ID, &tmp)) continue; theNodes[tmp].defined = true;#if 0 ndbout << "--------------------------------------" << endl; ndbout << "--------------------------------------" << endl; ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));#endif unsigned type; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; switch(type){ case NODE_TYPE_DB: theNodes[tmp].m_info.m_type = NodeInfo::DB; break; case NODE_TYPE_API: theNodes[tmp].m_info.m_type = NodeInfo::API; break; case NODE_TYPE_MGM: theNodes[tmp].m_info.m_type = NodeInfo::MGM; break; case NODE_TYPE_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; break; case NODE_TYPE_EXT_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; { Uint32 hbFreq = 10000; //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq); theNodes[tmp].hbFrequency = hbFreq; assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000); } break; default: type = type;#if 0 ndbout_c("ClusterMgr: Unknown node type: %d", type);#endif } }}voidClusterMgr::startThread() { NdbMutex_Lock(clusterMgrThreadMutex); theStop = 0; theClusterMgrThread = NdbThread_Create(runClusterMgr_C, (void**)this, 32768, "ndb_clustermgr", NDB_THREAD_PRIO_LOW); NdbMutex_Unlock(clusterMgrThreadMutex);}voidClusterMgr::doStop( ){ DBUG_ENTER("ClusterMgr::doStop"); NdbMutex_Lock(clusterMgrThreadMutex); if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void *status; theStop = 1; if (theClusterMgrThread) { NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_Destroy(&theClusterMgrThread); } NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN;}voidClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; Uint32 timeSlept = 100; Uint64 now = NdbTick_CurrentMillisecond(); while(!theStop){ /** * Start of Secure area for use of Transporter */ int send_heartbeat_now= global_flag_send_heartbeat_now; global_flag_send_heartbeat_now= 0; theFacade.lock_mutex(); for (int i = 1; i < MAX_NODES; i++){ /** * Send register request (heartbeat) to all available nodes * at specified timing intervals */ const NodeId nodeId = i; Node & theNode = theNodes[nodeId]; if (!theNode.defined) continue; if (theNode.connected == false){ theFacade.doConnect(nodeId); continue; } if (!theNode.compatible){ continue; } theNode.hbCounter += timeSlept; if (theNode.hbCounter >= theNode.hbFrequency || send_heartbeat_now) { /** * It is now time to send a new Heartbeat */ if (theNode.hbCounter >= theNode.hbFrequency) { theNode.m_info.m_heartbeat_cnt++; theNode.hbCounter = 0; } /** * If the node is of type REP, * then the receiver of the signal should be API_CLUSTERMGR */ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; }#if 0 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);#endif theFacade.sendSignalUnCond(&signal, nodeId); }//if if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){ reportNodeFailed(i); }//if } /** * End of secure area. Let other threads in */ theFacade.unlock_mutex(); // Sleep for 100 ms between each Registration Heartbeat Uint64 before = now; NdbSleep_MilliSleep(100); now = NdbTick_CurrentMillisecond(); timeSlept = (now - before); }}#if 0voidClusterMgr::showState(NodeId nodeId){ ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl; ndbout << "theNodeList = " << theNodeList[nodeId] << endl; ndbout << "theNodeState = " << theNodeState[nodeId] << endl; ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl; ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl; ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;}#endifClusterMgr::Node::Node() : m_state(NodeState::SL_NOTHING) { compatible = nfCompleteRep = true; connected = defined = m_alive = false; m_state.m_connected_nodes.clear();}/****************************************************************************** * API_REGREQ and friends ******************************************************************************/voidClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref);#if 0 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);#endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegReq->version){ node.m_info.m_version = apiRegReq->version; if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) || getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) { node.compatible = false; } else { node.compatible = true; } } NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGCONF; signal.theReceiversBlockNumber = API_CLUSTERMGR; signal.theTrace = 0; signal.theLength = ApiRegConf::SignalLength; ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend()); conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId()); conf->version = NDB_VERSION; conf->apiHeartbeatFrequency = node.hbFrequency; theFacade.sendSignalUnCond(&signal, nodeId);}int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;voidClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); #if 0 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);#endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegConf->version){ node.m_info.m_version = apiRegConf->version; if (global_mgmt_server_check == 1) node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, node.m_info.m_version); else node.compatible = ndbCompatible_api_ndb(NDB_VERSION, node.m_info.m_version); } node.m_state = apiRegConf->nodeState; if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED || node.m_state.startLevel == NodeState::SL_SINGLEUSER)){ set_node_alive(node, true); } else { set_node_alive(node, false); }//if node.m_info.m_heartbeat_cnt = 0; node.hbCounter = 0; if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; }}voidClusterMgr::execAPI_REGREF(const Uint32 * theData){ ApiRegRef * ref = (ApiRegRef*)theData; const NodeId nodeId = refToNode(ref->ref); assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.connected == true); assert(node.defined == true); node.compatible = false; set_node_alive(node, false); node.m_state = NodeState::SL_NOTHING; node.m_info.m_version = ref->version; switch(ref->errorCode){ case ApiRegRef::WrongType: ndbout_c("Node %d reports that this node should be a NDB node", nodeId); abort(); case ApiRegRef::UnsupportedVersion: default: break; }}voidClusterMgr::execNODE_FAILREP(const Uint32 * theData){ NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0]; for(int i = 1; i<MAX_NODES; i++){ if(NodeBitmask::get(nodeFail->theNodes, i)){ reportNodeFailed(i); } }}voidClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){ NFCompleteRep * const nfComp = (NFCompleteRep *)theData; const NodeId nodeId = nfComp->failedNodeId; assert(nodeId > 0 && nodeId < MAX_NODES); theFacade.ReportNodeFailureComplete(nodeId); theNodes[nodeId].nfCompleteRep = true;}voidClusterMgr::reportConnected(NodeId nodeId){ /** * Ensure that we are sending heartbeat every 100 ms * until we have got the first reply from NDB providing * us with the real time-out period to use. */ assert(nodeId > 0 && nodeId < MAX_NODES); noOfConnectedNodes++; Node & theNode = theNodes[nodeId]; theNode.connected = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -