📄 qmgrmain.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#define QMGR_C#include "Qmgr.hpp"#include <pc.hpp>#include <NdbTick.h>#include <signaldata/EventReport.hpp>#include <signaldata/StartOrd.hpp>#include <signaldata/CmInit.hpp>#include <signaldata/CloseComReqConf.hpp>#include <signaldata/PrepFailReqRef.hpp>#include <signaldata/NodeFailRep.hpp>#include <signaldata/ReadNodesConf.hpp>#include <signaldata/NFCompleteRep.hpp>#include <signaldata/CheckNodeGroups.hpp>#include <signaldata/ArbitSignalData.hpp>#include <signaldata/ApiRegSignalData.hpp>#include <signaldata/ApiVersion.hpp>#include <signaldata/BlockCommitOrd.hpp>#include <signaldata/FailRep.hpp>#include <signaldata/DisconnectRep.hpp>#include <signaldata/ApiBroadcast.hpp>#include <ndb_version.h>#ifdef DEBUG_ARBIT#include <NdbOut.hpp>#endif//#define DEBUG_QMGR_START#ifdef DEBUG_QMGR_START#include <DebuggerNames.hpp>#define DEBUG(x) ndbout << "QMGR " << __LINE__ << ": " << x << endl#define DEBUG_START(gsn, node, msg) DEBUG(getSignalName(gsn) << " to: " << node << " - " << msg)#define DEBUG_START2(gsn, rg, msg) { char nodes[255]; DEBUG(getSignalName(gsn) << " to: " << rg.m_nodes.getText(nodes) << " - " << msg); }#define DEBUG_START3(signal, msg) DEBUG(getSignalName(signal->header.theVerId_signalNumber) << " from " << refToNode(signal->getSendersBlockRef()) << " - " << msg);#else#define DEBUG(x)#define DEBUG_START(gsn, node, msg)#define DEBUG_START2(gsn, rg, msg)#define DEBUG_START3(signal, msg)#endif/** * c_start.m_gsn = GSN_CM_REGREQ * Possible for all nodes * c_start.m_nodes contains all nodes in config * * c_start.m_gsn = GSN_CM_NODEINFOREQ; * Set when receiving CM_REGCONF * State possible for starting node only (not in cluster) * * c_start.m_nodes contains all node in alive cluster that * that has not replied to GSN_CM_NODEINFOREQ * passed by president in GSN_CM_REGCONF * * c_start.m_gsn = GSN_CM_ADD * Possible for president only * Set when receiving and accepting CM_REGREQ (to include node) * * c_start.m_nodes contains all nodes in alive cluster + starting node * that has not replied to GSN_CM_ADD * by sending GSN_CM_ACKADD * * c_start.m_gsn = GSN_CM_NODEINFOCONF * Possible for non presidents only * c_start.m_nodes contains a node that has been accepted by president * but has not connected to us yet */// Signal entries and statement blocks/* 4 P R O G R A M *//*******************************//* CMHEART_BEAT *//*******************************/void Qmgr::execCM_HEARTBEAT(Signal* signal) { NodeRecPtr hbNodePtr; jamEntry(); hbNodePtr.i = signal->theData[0]; ptrCheckGuard(hbNodePtr, MAX_NDB_NODES, nodeRec); setNodeInfo(hbNodePtr.i).m_heartbeat_cnt= 0; return;}//Qmgr::execCM_HEARTBEAT()/*******************************//* CM_NODEINFOREF *//*******************************/void Qmgr::execCM_NODEINFOREF(Signal* signal) { jamEntry(); systemErrorLab(signal, __LINE__); return;}//Qmgr::execCM_NODEINFOREF()/*******************************//* CONTINUEB *//*******************************/void Qmgr::execCONTINUEB(Signal* signal) { jamEntry(); const Uint32 tcontinuebType = signal->theData[0]; const Uint32 tdata0 = signal->theData[1]; const Uint32 tdata1 = signal->theData[2]; switch (tcontinuebType) { case ZREGREQ_TIMELIMIT: jam(); if (c_start.m_startKey != tdata0 || c_start.m_startNode != tdata1) { jam(); return; }//if regreqTimeLimitLab(signal); break; case ZREGREQ_MASTER_TIMELIMIT: jam(); if (c_start.m_startKey != tdata0 || c_start.m_startNode != tdata1) { jam(); return; }//if //regreqMasterTimeLimitLab(signal); failReportLab(signal, c_start.m_startNode, FailRep::ZSTART_IN_REGREQ); return; break; case ZTIMER_HANDLING: jam(); timerHandlingLab(signal); return; break; case ZARBIT_HANDLING: jam(); runArbitThread(signal); return; break; case ZSTART_FAILURE_LIMIT:{ if (cpresident != ZNIL) { jam(); return; } Uint64 now = NdbTick_CurrentMillisecond(); if (now > (c_start_election_time + c_restartFailureTimeout)) { jam(); BaseString tmp; tmp.append("Shutting down node as total restart time exceeds " " StartFailureTimeout as set in config file "); if(c_restartFailureTimeout == ~0) tmp.append(" 0 (inifinite)"); else tmp.appfmt(" %d", c_restartFailureTimeout); progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, tmp.c_str()); } signal->theData[0] = ZSTART_FAILURE_LIMIT; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 3000, 1); return; } default: jam(); // ZCOULD_NOT_OCCUR_ERROR; systemErrorLab(signal, __LINE__); return; break; }//switch return;}//Qmgr::execCONTINUEB()void Qmgr::execDEBUG_SIG(Signal* signal) { NodeRecPtr debugNodePtr; jamEntry(); debugNodePtr.i = signal->theData[0]; ptrCheckGuard(debugNodePtr, MAX_NODES, nodeRec); return;}//Qmgr::execDEBUG_SIG()/*******************************//* FAIL_REP *//*******************************/void Qmgr::execFAIL_REP(Signal* signal) { const FailRep * const failRep = (FailRep *)&signal->theData[0]; const NodeId failNodeId = failRep->failNodeId; const FailRep::FailCause failCause = (FailRep::FailCause)failRep->failCause; jamEntry(); failReportLab(signal, failNodeId, failCause); return;}//Qmgr::execFAIL_REP()/*******************************//* PRES_TOREQ *//*******************************/void Qmgr::execPRES_TOREQ(Signal* signal) { jamEntry(); BlockReference Tblockref = signal->theData[0]; signal->theData[0] = getOwnNodeId(); signal->theData[1] = ccommitFailureNr; sendSignal(Tblockref, GSN_PRES_TOCONF, signal, 2, JBA); return;}//Qmgr::execPRES_TOREQ()void Qmgr::execREAD_CONFIG_REQ(Signal* signal){ jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB);}/*4.2 ADD NODE MODULE*//*##########################################################################*//*4.2.1 STTOR *//**-------------------------------------------------------------------------- * Start phase signal, must be handled by all blocks. * QMGR is only interested in the first phase. * During phase one we clear all registered applications. *---------------------------------------------------------------------------*//*******************************//* STTOR *//*******************************/void Qmgr::execSTTOR(Signal* signal) { jamEntry(); switch(signal->theData[1]){ case 1: initData(signal); startphase1(signal); return; case 7: cactivateApiCheck = 1; /** * Start arbitration thread. This could be done as soon as * we have all nodes (or a winning majority). */ if (cpresident == getOwnNodeId()) handleArbitStart(signal); break; } sendSttorryLab(signal); return;}//Qmgr::execSTTOR()void Qmgr::sendSttorryLab(Signal* signal) {/****************************<*//*< STTORRY <*//****************************<*/ signal->theData[3] = 7; signal->theData[4] = 255; sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB); return;}//Qmgr::sendSttorryLab()void Qmgr::startphase1(Signal* signal) { jamEntry(); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrAss(nodePtr, nodeRec); nodePtr.p->phase = ZSTARTING; nodePtr.p->blockRef = reference(); c_connectedNodes.set(nodePtr.i); signal->theData[0] = reference(); sendSignal(DBDIH_REF, GSN_DIH_RESTARTREQ, signal, 1, JBB); return;}voidQmgr::execDIH_RESTARTREF(Signal*signal){ jamEntry(); c_start.m_latest_gci = 0; execCM_INFOCONF(signal);}voidQmgr::execDIH_RESTARTCONF(Signal*signal){ jamEntry(); c_start.m_latest_gci = signal->theData[1]; execCM_INFOCONF(signal);}void Qmgr::setHbDelay(UintR aHbDelay){ hb_send_timer.setDelay(aHbDelay < 10 ? 10 : aHbDelay); hb_send_timer.reset(); hb_check_timer.setDelay(aHbDelay < 10 ? 10 : aHbDelay); hb_check_timer.reset();}void Qmgr::setHbApiDelay(UintR aHbApiDelay){ chbApiDelay = (aHbApiDelay < 100 ? 100 : aHbApiDelay); hb_api_timer.setDelay(chbApiDelay); hb_api_timer.reset();}void Qmgr::setArbitTimeout(UintR aArbitTimeout){ arbitRec.timeout = (aArbitTimeout < 10 ? 10 : aArbitTimeout);}void Qmgr::execCONNECT_REP(Signal* signal){ jamEntry(); const Uint32 nodeId = signal->theData[0]; if (ERROR_INSERTED(931)) { jam(); ndbout_c("Discarding CONNECT_REP(%d)", nodeId); infoEvent("Discarding CONNECT_REP(%d)", nodeId); return; } c_connectedNodes.set(nodeId); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrCheckGuard(nodePtr, MAX_NODES, nodeRec); switch(nodePtr.p->phase){ case ZRUNNING: ndbrequire(!c_clusterNodes.get(nodeId)); case ZSTARTING: jam(); break; case ZPREPARE_FAIL: case ZFAIL_CLOSING: jam(); return; case ZINIT: ndbrequire(false); case ZAPI_ACTIVE: case ZAPI_INACTIVE: return; } if (getNodeInfo(nodeId).getType() != NodeInfo::DB) { jam(); return; } switch(c_start.m_gsn){ case GSN_CM_REGREQ: jam(); sendCmRegReq(signal, nodeId); /** * We're waiting for CM_REGCONF c_start.m_nodes contains all configured * nodes */ ndbrequire(nodePtr.p->phase == ZSTARTING); ndbrequire(c_start.m_nodes.isWaitingFor(nodeId)); return; case GSN_CM_NODEINFOREQ: jam(); if (c_start.m_nodes.isWaitingFor(nodeId)) { jam(); ndbrequire(getOwnNodeId() != cpresident); ndbrequire(nodePtr.p->phase == ZSTARTING); sendCmNodeInfoReq(signal, nodeId, nodePtr.p); return; } return; case GSN_CM_NODEINFOCONF:{ jam(); ndbrequire(getOwnNodeId() != cpresident); ndbrequire(nodePtr.p->phase == ZRUNNING); if (c_start.m_nodes.isWaitingFor(nodeId)) { jam(); c_start.m_nodes.clearWaitingFor(nodeId); c_start.m_gsn = RNIL; NodeRecPtr addNodePtr; addNodePtr.i = nodeId; ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec); cmAddPrepare(signal, addNodePtr, nodePtr.p); return; } } default: (void)1; } ndbrequire(!c_start.m_nodes.isWaitingFor(nodeId)); ndbrequire(!c_readnodes_nodes.get(nodeId)); c_readnodes_nodes.set(nodeId); signal->theData[0] = reference(); sendSignal(calcQmgrBlockRef(nodeId), GSN_READ_NODESREQ, signal, 1, JBA); return;}//Qmgr::execCONNECT_REP()voidQmgr::execREAD_NODESCONF(Signal* signal){ check_readnodes_reply(signal,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -