📄 clustermgr.hpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#ifndef ClusterMgr_H#define ClusterMgr_H#include "API.hpp"#include <ndb_limits.h>#include <NdbThread.h>#include <NdbMutex.h>#include <NdbCondition.h>#include <signaldata/ArbitSignalData.hpp>#include <signaldata/NodeStateSignalData.hpp>#include <NodeInfo.hpp>#include <NodeState.hpp>extern "C" void* runClusterMgr_C(void * me);/** * @class ClusterMgr */class ClusterMgr { friend void* runClusterMgr_C(void * me); friend void execute(void *, struct SignalHeader * const, Uint8, Uint32 * const, LinearSectionPtr ptr[3]);public: ClusterMgr(class TransporterFacade &); ~ClusterMgr(); void init(struct ndb_mgm_configuration_iterator & config); void reportConnected(NodeId nodeId); void reportDisconnected(NodeId nodeId); bool checkUpgradeCompatability(Uint32 nodeVersion); void doStop(); void startThread(); private: void threadMain(); int theStop; class TransporterFacade & theFacade; public: struct Node { Node(); bool defined; bool connected; // Transporter connected bool compatible; // Version is compatible bool nfCompleteRep; // NF Complete Rep has arrived bool m_alive; // Node is alive NodeInfo m_info; NodeState m_state; /** * Heartbeat stuff */ Uint32 hbFrequency; // Heartbeat frequence Uint32 hbCounter; // # milliseconds passed since last hb sent }; const Node & getNodeInfo(NodeId) const; Uint32 getNoOfConnectedNodes() const; void hb_received(NodeId);private: Uint32 noOfAliveNodes; Uint32 noOfConnectedNodes; Node theNodes[MAX_NODES]; NdbThread* theClusterMgrThread; /** * Used for controlling start/stop of the thread */ NdbMutex* clusterMgrThreadMutex; void showState(NodeId nodeId); void reportNodeFailed(NodeId nodeId); /** * Signals received */ void execAPI_REGREQ (const Uint32 * theData); void execAPI_REGCONF (const Uint32 * theData); void execAPI_REGREF (const Uint32 * theData); void execNODE_FAILREP (const Uint32 * theData); void execNF_COMPLETEREP(const Uint32 * theData); inline void set_node_alive(Node& node, bool alive){ if(node.m_alive && !alive) { assert(noOfAliveNodes); noOfAliveNodes--; } else if(!node.m_alive && alive) { noOfAliveNodes++; } node.m_alive = alive; }};inlineconst ClusterMgr::Node &ClusterMgr::getNodeInfo(NodeId nodeId) const { return theNodes[nodeId];}inlineUint32ClusterMgr::getNoOfConnectedNodes() const { return noOfConnectedNodes;}inlinevoidClusterMgr::hb_received(NodeId nodeId) { theNodes[nodeId].m_info.m_heartbeat_cnt= 0;}/*****************************************************************************//** * @class ArbitMgr * Arbitration manager. Runs in separate thread. * Started only by a request from the kernel. */extern "C" void* runArbitMgr_C(void* me);class ArbitMgr{public: ArbitMgr(class TransporterFacade &); ~ArbitMgr(); inline void setRank(unsigned n) { theRank = n; } inline void setDelay(unsigned n) { theDelay = n; } void doStart(const Uint32* theData); void doChoose(const Uint32* theData); void doStop(const Uint32* theData); friend void* runArbitMgr_C(void* me);private: class TransporterFacade & theFacade; unsigned theRank; unsigned theDelay; void threadMain(); NdbThread* theThread; NdbMutex* theThreadMutex; // not really needed struct ArbitSignal { GlobalSignalNumber gsn; ArbitSignalData data; NDB_TICKS timestamp; inline void init(GlobalSignalNumber aGsn, const Uint32* aData) { gsn = aGsn; if (aData != NULL) memcpy(&data, aData, sizeof(data)); else memset(&data, 0, sizeof(data)); } inline void setTimestamp() { timestamp = NdbTick_CurrentMillisecond(); } inline NDB_TICKS getTimediff() { NDB_TICKS now = NdbTick_CurrentMillisecond(); return now < timestamp ? 0 : now - timestamp; } }; NdbMutex* theInputMutex; NdbCondition* theInputCond; int theInputTimeout; bool theInputFull; // the predicate ArbitSignal theInputBuffer; // shared buffer void sendSignalToThread(ArbitSignal& aSignal); enum State { // thread states StateInit, StateStarted, // thread started StateChoose1, // received one valid REQ StateChoose2, // received two valid REQs StateFinished // finished one way or other }; State theState; enum Stop { // stop code in ArbitSignal.data.code StopExit = 1, // at API exit StopRequest = 2, // request from kernel StopRestart = 3 // stop before restart }; void threadStart(ArbitSignal& aSignal); // handle thread events void threadChoose(ArbitSignal& aSignal); void threadTimeout(); void threadStop(ArbitSignal& aSignal); ArbitSignal theStartReq; ArbitSignal theChooseReq1; ArbitSignal theChooseReq2; ArbitSignal theStopOrd; void sendStartConf(ArbitSignal& aSignal, Uint32); void sendChooseRef(ArbitSignal& aSignal, Uint32); void sendChooseConf(ArbitSignal& aSignal, Uint32); void sendStopRep(ArbitSignal& aSignal, Uint32); void sendSignalToQmgr(ArbitSignal& aSignal);};#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -