📄 qmgrmain.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#define QMGR_C#include "Qmgr.hpp"#include <pc.hpp>#include <NdbTick.h>#include <signaldata/EventReport.hpp>#include <signaldata/StartOrd.hpp>#include <signaldata/CmInit.hpp>#include <signaldata/CloseComReqConf.hpp>#include <signaldata/PrepFailReqRef.hpp>#include <signaldata/NodeFailRep.hpp>#include <signaldata/ReadNodesConf.hpp>#include <signaldata/NFCompleteRep.hpp>#include <signaldata/CheckNodeGroups.hpp>#include <signaldata/ArbitSignalData.hpp>#include <signaldata/ApiRegSignalData.hpp>#include <signaldata/ApiVersion.hpp>#include <signaldata/BlockCommitOrd.hpp>#include <signaldata/FailRep.hpp>#include <signaldata/DisconnectRep.hpp>#include <signaldata/ApiBroadcast.hpp>#include <ndb_version.h>#ifdef DEBUG_ARBIT#include <NdbOut.hpp>#endif//#define DEBUG_QMGR_START#ifdef DEBUG_QMGR_START#include <DebuggerNames.hpp>#define DEBUG(x) ndbout << "QMGR " << __LINE__ << ": " << x << endl#define DEBUG_START(gsn, node, msg) DEBUG(getSignalName(gsn) << " to: " << node << " - " << msg)#define DEBUG_START2(gsn, rg, msg) { char nodes[255]; DEBUG(getSignalName(gsn) << " to: " << rg.m_nodes.getText(nodes) << " - " << msg); }#define DEBUG_START3(signal, msg) DEBUG(getSignalName(signal->header.theVerId_signalNumber) << " from " << refToNode(signal->getSendersBlockRef()) << " - " << msg);#else#define DEBUG(x)#define DEBUG_START(gsn, node, msg)#define DEBUG_START2(gsn, rg, msg)#define DEBUG_START3(signal, msg)#endif// Signal entries and statement blocks/* 4 P R O G R A M *//*******************************//* CMHEART_BEAT *//*******************************/void Qmgr::execCM_HEARTBEAT(Signal* signal) { NodeRecPtr hbNodePtr; jamEntry(); hbNodePtr.i = signal->theData[0]; ptrCheckGuard(hbNodePtr, MAX_NDB_NODES, nodeRec); setNodeInfo(hbNodePtr.i).m_heartbeat_cnt= 0; return;}//Qmgr::execCM_HEARTBEAT()/*******************************//* CM_NODEINFOREF *//*******************************/void Qmgr::execCM_NODEINFOREF(Signal* signal) { jamEntry(); systemErrorLab(signal, __LINE__); return;}//Qmgr::execCM_NODEINFOREF()/*******************************//* CONTINUEB *//*******************************/void Qmgr::execCONTINUEB(Signal* signal) { jamEntry(); const Uint32 tcontinuebType = signal->theData[0]; const Uint32 tdata0 = signal->theData[1]; const Uint32 tdata1 = signal->theData[2]; switch (tcontinuebType) { case ZREGREQ_TIMELIMIT: jam(); if (c_start.m_startKey != tdata0 || c_start.m_startNode != tdata1) { jam(); return; }//if regreqTimeLimitLab(signal); break; case ZREGREQ_MASTER_TIMELIMIT: jam(); if (c_start.m_startKey != tdata0 || c_start.m_startNode != tdata1) { jam(); return; }//if //regreqMasterTimeLimitLab(signal); failReportLab(signal, c_start.m_startNode, FailRep::ZSTART_IN_REGREQ); return; break; case ZTIMER_HANDLING: jam(); timerHandlingLab(signal); return; break; case ZARBIT_HANDLING: jam(); runArbitThread(signal); return; break; default: jam(); // ZCOULD_NOT_OCCUR_ERROR; systemErrorLab(signal, __LINE__); return; break; }//switch return;}//Qmgr::execCONTINUEB()void Qmgr::execDEBUG_SIG(Signal* signal) { NodeRecPtr debugNodePtr; jamEntry(); debugNodePtr.i = signal->theData[0]; ptrCheckGuard(debugNodePtr, MAX_NODES, nodeRec); return;}//Qmgr::execDEBUG_SIG()/*******************************//* FAIL_REP *//*******************************/void Qmgr::execFAIL_REP(Signal* signal) { const FailRep * const failRep = (FailRep *)&signal->theData[0]; const NodeId failNodeId = failRep->failNodeId; const FailRep::FailCause failCause = (FailRep::FailCause)failRep->failCause; jamEntry(); failReportLab(signal, failNodeId, failCause); return;}//Qmgr::execFAIL_REP()/*******************************//* PRES_TOREQ *//*******************************/void Qmgr::execPRES_TOREQ(Signal* signal) { jamEntry(); BlockReference Tblockref = signal->theData[0]; signal->theData[0] = getOwnNodeId(); signal->theData[1] = ccommitFailureNr; sendSignal(Tblockref, GSN_PRES_TOCONF, signal, 2, JBA); return;}//Qmgr::execPRES_TOREQ()void Qmgr::execREAD_CONFIG_REQ(Signal* signal){ jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB);}/*4.2 ADD NODE MODULE*//*##########################################################################*//*4.2.1 STTOR *//**-------------------------------------------------------------------------- * Start phase signal, must be handled by all blocks. * QMGR is only interested in the first phase. * During phase one we clear all registered applications. *---------------------------------------------------------------------------*//*******************************//* STTOR *//*******************************/void Qmgr::execSTTOR(Signal* signal) { jamEntry(); switch(signal->theData[1]){ case 1: initData(signal); startphase1(signal); return; case 7: cactivateApiCheck = 1; /** * Start arbitration thread. This could be done as soon as * we have all nodes (or a winning majority). */ if (cpresident == getOwnNodeId()) handleArbitStart(signal); break; } sendSttorryLab(signal); return;}//Qmgr::execSTTOR()void Qmgr::sendSttorryLab(Signal* signal) {/****************************<*//*< STTORRY <*//****************************<*/ signal->theData[3] = 7; signal->theData[4] = 255; sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB); return;}//Qmgr::sendSttorryLab()void Qmgr::startphase1(Signal* signal) { jamEntry(); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrAss(nodePtr, nodeRec); nodePtr.p->phase = ZSTARTING; nodePtr.p->blockRef = reference(); c_connectedNodes.set(nodePtr.i); signal->theData[0] = 0; // no answer signal->theData[1] = 0; // no id signal->theData[2] = NodeInfo::DB; sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 3, JBB); execCM_INFOCONF(signal); return;}void Qmgr::setHbDelay(UintR aHbDelay){ hb_send_timer.setDelay(aHbDelay < 10 ? 10 : aHbDelay); hb_send_timer.reset(); hb_check_timer.setDelay(aHbDelay < 10 ? 10 : aHbDelay); hb_check_timer.reset();}void Qmgr::setHbApiDelay(UintR aHbApiDelay){ chbApiDelay = (aHbApiDelay < 100 ? 100 : aHbApiDelay); hb_api_timer.setDelay(chbApiDelay); hb_api_timer.reset();}void Qmgr::setArbitTimeout(UintR aArbitTimeout){ arbitRec.timeout = (aArbitTimeout < 10 ? 10 : aArbitTimeout);}void Qmgr::execCONNECT_REP(Signal* signal){ jamEntry(); const Uint32 nodeId = signal->theData[0]; c_connectedNodes.set(nodeId); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrCheckGuard(nodePtr, MAX_NODES, nodeRec); switch(nodePtr.p->phase){ case ZSTARTING: case ZRUNNING: jam(); if(!c_start.m_nodes.isWaitingFor(nodeId)){ jam(); return; } break; case ZPREPARE_FAIL: case ZFAIL_CLOSING: jam(); return; case ZINIT: ndbrequire(false); case ZAPI_ACTIVE: case ZAPI_INACTIVE: return; } switch(c_start.m_gsn){ case GSN_CM_REGREQ: jam(); sendCmRegReq(signal, nodeId); return; case GSN_CM_NODEINFOREQ: jam(); sendCmNodeInfoReq(signal, nodeId, nodePtr.p); return; case GSN_CM_ADD:{ jam(); ndbrequire(getOwnNodeId() != cpresident); c_start.m_nodes.clearWaitingFor(nodeId); c_start.m_gsn = RNIL; NodeRecPtr addNodePtr; addNodePtr.i = nodeId; ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec); cmAddPrepare(signal, addNodePtr, nodePtr.p); return; } default: return; } return;}//Qmgr::execCONNECT_REP()/*******************************//* CM_INFOCONF *//*******************************/void Qmgr::execCM_INFOCONF(Signal* signal) { cpresident = ZNIL; cpresidentCandidate = getOwnNodeId(); cpresidentAlive = ZFALSE; c_stopElectionTime = NdbTick_CurrentMillisecond(); c_stopElectionTime += c_restartPartialTimeout; cmInfoconf010Lab(signal); return;}//Qmgr::execCM_INFOCONF()void Qmgr::cmInfoconf010Lab(Signal* signal) { c_start.m_startKey = 0; c_start.m_startNode = getOwnNodeId(); c_start.m_nodes.clearWaitingFor(); c_start.m_gsn = GSN_CM_REGREQ; NodeRecPtr nodePtr; c_regReqReqSent = c_regReqReqRecv = 0; cnoOfNodes = 0; for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) { jam(); ptrAss(nodePtr, nodeRec); if(getNodeInfo(nodePtr.i).getType() != NodeInfo::DB) continue; c_start.m_nodes.setWaitingFor(nodePtr.i); cnoOfNodes++; if(!c_connectedNodes.get(nodePtr.i)) continue; sendCmRegReq(signal, nodePtr.i); } //---------------------------------------- /* Wait for a while. When it returns */ /* we will check if we got any CM_REGREF*/ /* or CM_REGREQ (lower nodeid than our */ /* own). */ //---------------------------------------- signal->theData[0] = ZREGREQ_TIMELIMIT; signal->theData[1] = c_start.m_startKey; signal->theData[2] = c_start.m_startNode; sendSignalWithDelay(QMGR_REF, GSN_CONTINUEB, signal, 3000, 3); creadyDistCom = ZTRUE; return;}//Qmgr::cmInfoconf010Lab()voidQmgr::sendCmRegReq(Signal * signal, Uint32 nodeId){ c_regReqReqSent++; CmRegReq * const cmRegReq = (CmRegReq *)&signal->theData[0]; cmRegReq->blockRef = reference(); cmRegReq->nodeId = getOwnNodeId(); cmRegReq->version = NDB_VERSION; const Uint32 ref = calcQmgrBlockRef(nodeId); sendSignal(ref, GSN_CM_REGREQ, signal, CmRegReq::SignalLength, JBB); DEBUG_START(GSN_CM_REGREQ, nodeId, "");}/*4.4.11 CM_REGREQ *//**-------------------------------------------------------------------------- * If this signal is received someone tries to get registrated. * Only the president have the authority make decissions about new nodes, * so only a president or a node that claims to be the president may send a * reply to this signal. * This signal can occur any time after that STTOR was received. * CPRESIDENT: Timelimit has expired and someone has * decided to enter the president role * CPRESIDENT_CANDIDATE: * Assigned when we receive a CM_REGREF, if we got more than one REF * then we always keep the lowest nodenumber. * We accept this nodeno as president when our timelimit expires
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -