📄 qmgrmain.cpp
字号:
* We should consider the following cases: * 1- We are the president. If we are busy by adding new nodes to cluster, * then we have to refuse this node to be added. * The refused node will try in ZREFUSE_ADD_TIME seconds again. * If we are not busy then we confirm * * 2- We know the president, we dont bother us about this REQ. * The president has also got this REQ and will take care of it. * * 3- The president are not known. We have received CM_INIT, so we compare the * senders node number to GETOWNNODEID(). * If we have a lower number than the sender then we will claim * that we are the president so we send him a refuse signal back. * We have to wait for the CONTINUEB signal before we can enter the * president role. If our GETOWNNODEID() if larger than sender node number, * we are not the president and just have to wait for the * reply signal (REF) to our CM_REGREQ_2. * 4- We havent received the CM_INIT signal so we don't know who we are. * Ignore the request. *--------------------------------------------------------------------------*//*******************************//* CM_REGREQ *//*******************************/void Qmgr::execCM_REGREQ(Signal* signal) { DEBUG_START3(signal, ""); NodeRecPtr addNodePtr; jamEntry(); CmRegReq * const cmRegReq = (CmRegReq *)&signal->theData[0]; const BlockReference Tblockref = cmRegReq->blockRef; const Uint32 startingVersion = cmRegReq->version; addNodePtr.i = cmRegReq->nodeId; if (creadyDistCom == ZFALSE) { jam(); /* NOT READY FOR DISTRIBUTED COMMUNICATION.*/ return; }//if if (!ndbCompatible_ndb_ndb(NDB_VERSION, startingVersion)) { jam(); sendCmRegrefLab(signal, Tblockref, CmRegRef::ZINCOMPATIBLE_VERSION); return; } ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec); if (cpresident != getOwnNodeId()){ jam(); if (cpresident == ZNIL) { /*** * We don't know the president. * If the node to be added has lower node id * than our president cancidate. Set it as * candidate */ jam(); if (addNodePtr.i < cpresidentCandidate) { jam(); cpresidentCandidate = addNodePtr.i; }//if sendCmRegrefLab(signal, Tblockref, CmRegRef::ZELECTION); return; } /** * We are not the president. * We know the president. * President will answer. */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZNOT_PRESIDENT); return; }//if if (c_start.m_startNode != 0){ jam(); /** * President busy by adding another node */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZBUSY_PRESIDENT); return; }//if if (ctoStatus == Q_ACTIVE) { jam(); /** * Active taking over as president */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZBUSY_TO_PRES); return; }//if if (getNodeInfo(addNodePtr.i).m_type != NodeInfo::DB) { jam(); /** * The new node is not in config file */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZNOT_IN_CFG); return; } Phase phase = addNodePtr.p->phase; if (phase != ZINIT){ jam(); DEBUG("phase = " << phase); sendCmRegrefLab(signal, Tblockref, CmRegRef::ZNOT_DEAD); return; }//if jam(); /** * WE ARE PRESIDENT AND WE ARE NOT BUSY ADDING ANOTHER NODE. * WE WILL TAKE CARE OF THE INCLUSION OF THIS NODE INTO THE CLUSTER. * WE NEED TO START TIME SUPERVISION OF THIS. SINCE WE CANNOT STOP * TIMED SIGNAL IF THE INCLUSION IS INTERRUPTED WE IDENTIFY * EACH INCLUSION WITH A UNIQUE IDENTITY. THIS IS CHECKED WHEN * THE SIGNAL ARRIVES. IF IT HAS CHANGED THEN WE SIMPLY IGNORE * THE TIMED SIGNAL. */ /** * Update start record */ c_start.m_startKey++; c_start.m_startNode = addNodePtr.i; /** * Assign dynamic id */ UintR TdynId = ++c_maxDynamicId; setNodeInfo(addNodePtr.i).m_version = startingVersion; addNodePtr.p->ndynamicId = TdynId; /** * Reply with CM_REGCONF */ CmRegConf * const cmRegConf = (CmRegConf *)&signal->theData[0]; cmRegConf->presidentBlockRef = reference(); cmRegConf->presidentNodeId = getOwnNodeId(); cmRegConf->presidentVersion = getNodeInfo(getOwnNodeId()).m_version; cmRegConf->dynamicId = TdynId; c_clusterNodes.copyto(NdbNodeBitmask::Size, cmRegConf->allNdbNodes); sendSignal(Tblockref, GSN_CM_REGCONF, signal, CmRegConf::SignalLength, JBA); DEBUG_START(GSN_CM_REGCONF, refToNode(Tblockref), ""); /** * Send CmAdd to all nodes (including starting) */ c_start.m_nodes = c_clusterNodes; c_start.m_nodes.setWaitingFor(addNodePtr.i); c_start.m_gsn = GSN_CM_ADD; NodeReceiverGroup rg(QMGR, c_start.m_nodes); CmAdd * const cmAdd = (CmAdd*)signal->getDataPtrSend(); cmAdd->requestType = CmAdd::Prepare; cmAdd->startingNodeId = addNodePtr.i; cmAdd->startingVersion = startingVersion; sendSignal(rg, GSN_CM_ADD, signal, CmAdd::SignalLength, JBA); DEBUG_START2(GSN_CM_ADD, rg, "Prepare"); /** * Set timer */ return; signal->theData[0] = ZREGREQ_MASTER_TIMELIMIT; signal->theData[1] = c_start.m_startKey; sendSignalWithDelay(QMGR_REF, GSN_CONTINUEB, signal, 30000, 2); return;}//Qmgr::execCM_REGREQ()void Qmgr::sendCmRegrefLab(Signal* signal, BlockReference TBRef, CmRegRef::ErrorCode Terror) { CmRegRef* ref = (CmRegRef*)signal->getDataPtrSend(); ref->blockRef = reference(); ref->nodeId = getOwnNodeId(); ref->errorCode = Terror; ref->presidentCandidate = (cpresident == ZNIL ? cpresidentCandidate : cpresident); sendSignal(TBRef, GSN_CM_REGREF, signal, CmRegRef::SignalLength, JBB); DEBUG_START(GSN_CM_REGREF, refToNode(TBRef), ""); return;}//Qmgr::sendCmRegrefLab()/*4.4.11 CM_REGCONF *//**-------------------------------------------------------------------------- * President gives permission to a node which wants to join the cluster. * The president will prepare the cluster that a new node will be added to * cluster. When the new node has set up all connections to the cluster, * the president will send commit to all clusternodes so the phase of the * new node can be changed to ZRUNNING. *--------------------------------------------------------------------------*//*******************************//* CM_REGCONF *//*******************************/void Qmgr::execCM_REGCONF(Signal* signal) { DEBUG_START3(signal, ""); NodeRecPtr myNodePtr; NodeRecPtr nodePtr; jamEntry(); const CmRegConf * const cmRegConf = (CmRegConf *)&signal->theData[0]; if (!ndbCompatible_ndb_ndb(NDB_VERSION, cmRegConf->presidentVersion)) { jam(); char buf[128]; BaseString::snprintf(buf,sizeof(buf),"incompatible version own=0x%x other=0x%x, shutting down", NDB_VERSION, cmRegConf->presidentVersion); systemErrorLab(signal, __LINE__, buf); return; } cpdistref = cmRegConf->presidentBlockRef; cpresident = cmRegConf->presidentNodeId; UintR TdynamicId = cmRegConf->dynamicId; c_maxDynamicId = TdynamicId; c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes);/*--------------------------------------------------------------*/// Send this as an EVENT REPORT to inform about hearing about// other NDB node proclaiming to be president./*--------------------------------------------------------------*/ signal->theData[0] = NDB_LE_CM_REGCONF; signal->theData[1] = getOwnNodeId(); signal->theData[2] = cpresident; signal->theData[3] = TdynamicId; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); myNodePtr.i = getOwnNodeId(); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); myNodePtr.p->ndynamicId = TdynamicId; for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) { jam(); if (c_clusterNodes.get(nodePtr.i)){ jam(); ptrAss(nodePtr, nodeRec); ndbrequire(nodePtr.p->phase == ZINIT); nodePtr.p->phase = ZRUNNING; if(c_connectedNodes.get(nodePtr.i)){ jam(); sendCmNodeInfoReq(signal, nodePtr.i, myNodePtr.p); } } } c_start.m_gsn = GSN_CM_NODEINFOREQ; c_start.m_nodes = c_clusterNodes; return;}//Qmgr::execCM_REGCONF()voidQmgr::sendCmNodeInfoReq(Signal* signal, Uint32 nodeId, const NodeRec * self){ CmNodeInfoReq * const req = (CmNodeInfoReq*)signal->getDataPtrSend(); req->nodeId = getOwnNodeId(); req->dynamicId = self->ndynamicId; req->version = getNodeInfo(getOwnNodeId()).m_version; const Uint32 ref = calcQmgrBlockRef(nodeId); sendSignal(ref,GSN_CM_NODEINFOREQ, signal, CmNodeInfoReq::SignalLength, JBB); DEBUG_START(GSN_CM_NODEINFOREQ, nodeId, "");}/*4.4.11 CM_REGREF *//**-------------------------------------------------------------------------- * Only a president or a president candidate can refuse a node to get added to * the cluster. * Refuse reasons: * ZBUSY We know that the sender is the president and we have to * make a new CM_REGREQ. * ZNOT_IN_CFG This node number is not specified in the configfile, * SYSTEM ERROR * ZELECTION Sender is a president candidate, his timelimit * hasn't expired so maybe someone else will show up. * Update the CPRESIDENT_CANDIDATE, then wait for our * timelimit to expire. *---------------------------------------------------------------------------*//*******************************//* CM_REGREF *//*******************************/void Qmgr::execCM_REGREF(Signal* signal) { jamEntry(); c_regReqReqRecv++; // Ignore block reference in data[0] UintR TaddNodeno = signal->theData[1]; UintR TrefuseReason = signal->theData[2]; Uint32 candidate = signal->theData[3]; DEBUG_START3(signal, TrefuseReason); if(candidate != cpresidentCandidate){ jam(); c_regReqReqRecv = ~0; } switch (TrefuseReason) { case CmRegRef::ZINCOMPATIBLE_VERSION: jam(); systemErrorLab(signal, __LINE__, "incompatible version, connection refused by running ndb node"); break; case CmRegRef::ZBUSY: case CmRegRef::ZBUSY_TO_PRES: case CmRegRef::ZBUSY_PRESIDENT: jam(); cpresidentAlive = ZTRUE; signal->theData[3] = 0; break; case CmRegRef::ZNOT_IN_CFG: jam(); progError(__LINE__, NDBD_EXIT_NODE_NOT_IN_CONFIG); break; case CmRegRef::ZNOT_DEAD: jam(); progError(__LINE__, NDBD_EXIT_NODE_NOT_DEAD); break; case CmRegRef::ZELECTION: jam(); if (cpresidentCandidate > TaddNodeno) { jam(); //---------------------------------------- /* We may already have a candidate */ /* choose the lowest nodeno */ //---------------------------------------- signal->theData[3] = 2; cpresidentCandidate = TaddNodeno; } else { signal->theData[3] = 4; }//if break; case CmRegRef::ZNOT_PRESIDENT: jam(); cpresidentAlive = ZTRUE; signal->theData[3] = 3; break; default: jam(); signal->theData[3] = 5; /*empty*/; break; }//switch/*--------------------------------------------------------------*/// Send this as an EVENT REPORT to inform about hearing about// other NDB node proclaiming not to be president./*--------------------------------------------------------------*/ signal->theData[0] = NDB_LE_CM_REGREF; signal->theData[1] = getOwnNodeId(); signal->theData[2] = TaddNodeno;//-----------------------------------------// signal->theData[3] filled in above//----------------------------------------- sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); if(cpresidentAlive == ZTRUE){ jam(); DEBUG(""); return; } if(c_regReqReqSent != c_regReqReqRecv){ jam(); DEBUG( c_regReqReqSent << " != " << c_regReqReqRecv); return; } if(cpresidentCandidate != getOwnNodeId()){ jam(); DEBUG(""); return; } /** * All configured nodes has agreed */ Uint64 now = NdbTick_CurrentMillisecond(); if((c_regReqReqRecv == cnoOfNodes) || now > c_stopElectionTime){ jam(); electionWon(); sendSttorryLab(signal); /** * Start timer handling */ signal->theData[0] = ZTIMER_HANDLING; sendSignal(QMGR_REF, GSN_CONTINUEB, signal, 10, JBB); } return;}//Qmgr::execCM_REGREF()voidQmgr::electionWon(){ NodeRecPtr myNodePtr; cpresident = getOwnNodeId(); /* This node becomes president. */ myNodePtr.i = getOwnNodeId(); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); myNodePtr.p->phase = ZRUNNING; cpdistref = reference(); cneighbourl = ZNIL; cneighbourh = ZNIL; myNodePtr.p->ndynamicId = 1; c_maxDynamicId = 1; c_clusterNodes.clear(); c_clusterNodes.set(getOwnNodeId());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -