📄 qmgrmain.cpp
字号:
refToNode(signal->getSendersBlockRef()), GSN_READ_NODESCONF);}voidQmgr::execREAD_NODESREF(Signal* signal){ check_readnodes_reply(signal, refToNode(signal->getSendersBlockRef()), GSN_READ_NODESREF);}/*******************************//* CM_INFOCONF *//*******************************/void Qmgr::execCM_INFOCONF(Signal* signal) { /** * Open communcation to all DB nodes */ signal->theData[0] = 0; // no answer signal->theData[1] = 0; // no id signal->theData[2] = NodeInfo::DB; sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 3, JBB); cpresident = ZNIL; cpresidentAlive = ZFALSE; c_start_election_time = NdbTick_CurrentMillisecond(); signal->theData[0] = ZSTART_FAILURE_LIMIT; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 3000, 1); cmInfoconf010Lab(signal); return;}//Qmgr::execCM_INFOCONF()Uint32 g_start_type = 0;NdbNodeBitmask g_nowait_nodes; // Set by clovoid Qmgr::cmInfoconf010Lab(Signal* signal) { c_start.m_startKey = 0; c_start.m_startNode = getOwnNodeId(); c_start.m_nodes.clearWaitingFor(); c_start.m_gsn = GSN_CM_REGREQ; c_start.m_starting_nodes.clear(); c_start.m_starting_nodes_w_log.clear(); c_start.m_regReqReqSent = 0; c_start.m_regReqReqRecv = 0; c_start.m_skip_nodes = g_nowait_nodes; c_start.m_skip_nodes.bitAND(c_definedNodes); c_start.m_start_type = g_start_type; NodeRecPtr nodePtr; cnoOfNodes = 0; for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) { jam(); ptrAss(nodePtr, nodeRec); if(getNodeInfo(nodePtr.i).getType() != NodeInfo::DB) continue; c_start.m_nodes.setWaitingFor(nodePtr.i); cnoOfNodes++; if(!c_connectedNodes.get(nodePtr.i)) continue; sendCmRegReq(signal, nodePtr.i); } //---------------------------------------- /* Wait for a while. When it returns */ /* we will check if we got any CM_REGREF*/ /* or CM_REGREQ (lower nodeid than our */ /* own). */ //---------------------------------------- signal->theData[0] = ZREGREQ_TIMELIMIT; signal->theData[1] = c_start.m_startKey; signal->theData[2] = c_start.m_startNode; sendSignalWithDelay(QMGR_REF, GSN_CONTINUEB, signal, 3000, 3); creadyDistCom = ZTRUE; return;}//Qmgr::cmInfoconf010Lab()voidQmgr::sendCmRegReq(Signal * signal, Uint32 nodeId){ CmRegReq * req = (CmRegReq *)&signal->theData[0]; req->blockRef = reference(); req->nodeId = getOwnNodeId(); req->version = NDB_VERSION; req->latest_gci = c_start.m_latest_gci; req->start_type = c_start.m_start_type; c_start.m_skip_nodes.copyto(NdbNodeBitmask::Size, req->skip_nodes); const Uint32 ref = calcQmgrBlockRef(nodeId); sendSignal(ref, GSN_CM_REGREQ, signal, CmRegReq::SignalLength, JBB); DEBUG_START(GSN_CM_REGREQ, nodeId, ""); c_start.m_regReqReqSent++;}/*4.4.11 CM_REGREQ *//**-------------------------------------------------------------------------- * If this signal is received someone tries to get registrated. * Only the president have the authority make decissions about new nodes, * so only a president or a node that claims to be the president may send a * reply to this signal. * This signal can occur any time after that STTOR was received. * CPRESIDENT: Timelimit has expired and someone has * decided to enter the president role * CPRESIDENT_CANDIDATE: * Assigned when we receive a CM_REGREF, if we got more than one REF * then we always keep the lowest nodenumber. * We accept this nodeno as president when our timelimit expires * We should consider the following cases: * 1- We are the president. If we are busy by adding new nodes to cluster, * then we have to refuse this node to be added. * The refused node will try in ZREFUSE_ADD_TIME seconds again. * If we are not busy then we confirm * * 2- We know the president, we dont bother us about this REQ. * The president has also got this REQ and will take care of it. * * 3- The president are not known. We have received CM_INIT, so we compare the * senders node number to GETOWNNODEID(). * If we have a lower number than the sender then we will claim * that we are the president so we send him a refuse signal back. * We have to wait for the CONTINUEB signal before we can enter the * president role. If our GETOWNNODEID() if larger than sender node number, * we are not the president and just have to wait for the * reply signal (REF) to our CM_REGREQ_2. * 4- We havent received the CM_INIT signal so we don't know who we are. * Ignore the request. *--------------------------------------------------------------------------*//*******************************//* CM_REGREQ *//*******************************/staticintcheck_start_type(Uint32 starting, Uint32 own){ if (starting == (1 << NodeState::ST_INITIAL_START) && ((own & (1 << NodeState::ST_INITIAL_START)) == 0)) { return 1; } return 0;}void Qmgr::execCM_REGREQ(Signal* signal) { DEBUG_START3(signal, ""); NodeRecPtr addNodePtr; jamEntry(); CmRegReq * const cmRegReq = (CmRegReq *)&signal->theData[0]; const BlockReference Tblockref = cmRegReq->blockRef; const Uint32 startingVersion = cmRegReq->version; addNodePtr.i = cmRegReq->nodeId; Uint32 gci = 1; Uint32 start_type = ~0; NdbNodeBitmask skip_nodes; if (signal->getLength() == CmRegReq::SignalLength) { jam(); gci = cmRegReq->latest_gci; start_type = cmRegReq->start_type; skip_nodes.assign(NdbNodeBitmask::Size, cmRegReq->skip_nodes); } if (creadyDistCom == ZFALSE) { jam(); /* NOT READY FOR DISTRIBUTED COMMUNICATION.*/ return; }//if if (!ndbCompatible_ndb_ndb(NDB_VERSION, startingVersion)) { jam(); sendCmRegrefLab(signal, Tblockref, CmRegRef::ZINCOMPATIBLE_VERSION); return; } if (check_start_type(start_type, c_start.m_start_type)) { jam(); sendCmRegrefLab(signal, Tblockref, CmRegRef::ZINCOMPATIBLE_START_TYPE); return; } if (cpresident != getOwnNodeId()) { jam(); if (cpresident == ZNIL) { /*** * We don't know the president. * If the node to be added has lower node id * than our president cancidate. Set it as * candidate */ jam(); if (gci > c_start.m_president_candidate_gci || (gci == c_start.m_president_candidate_gci && addNodePtr.i < c_start.m_president_candidate)) { jam(); c_start.m_president_candidate = addNodePtr.i; c_start.m_president_candidate_gci = gci; } sendCmRegrefLab(signal, Tblockref, CmRegRef::ZELECTION); return; } /** * We are not the president. * We know the president. * President will answer. */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZNOT_PRESIDENT); return; }//if if (c_start.m_startNode != 0) { jam(); /** * President busy by adding another node */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZBUSY_PRESIDENT); return; }//if if (ctoStatus == Q_ACTIVE) { jam(); /** * Active taking over as president */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZBUSY_TO_PRES); return; }//if if (getNodeInfo(addNodePtr.i).m_type != NodeInfo::DB) { jam(); /** * The new node is not in config file */ sendCmRegrefLab(signal, Tblockref, CmRegRef::ZNOT_IN_CFG); return; } ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec); Phase phase = addNodePtr.p->phase; if (phase != ZINIT) { jam(); DEBUG("phase = " << phase); sendCmRegrefLab(signal, Tblockref, CmRegRef::ZNOT_DEAD); return; } jam(); /** * WE ARE PRESIDENT AND WE ARE NOT BUSY ADDING ANOTHER NODE. * WE WILL TAKE CARE OF THE INCLUSION OF THIS NODE INTO THE CLUSTER. * WE NEED TO START TIME SUPERVISION OF THIS. SINCE WE CANNOT STOP * TIMED SIGNAL IF THE INCLUSION IS INTERRUPTED WE IDENTIFY * EACH INCLUSION WITH A UNIQUE IDENTITY. THIS IS CHECKED WHEN * THE SIGNAL ARRIVES. IF IT HAS CHANGED THEN WE SIMPLY IGNORE * THE TIMED SIGNAL. */ /** * Update start record */ c_start.m_startKey++; c_start.m_startNode = addNodePtr.i; /** * Assign dynamic id */ UintR TdynId = ++c_maxDynamicId; setNodeInfo(addNodePtr.i).m_version = startingVersion; addNodePtr.p->ndynamicId = TdynId; /** * Reply with CM_REGCONF */ CmRegConf * const cmRegConf = (CmRegConf *)&signal->theData[0]; cmRegConf->presidentBlockRef = reference(); cmRegConf->presidentNodeId = getOwnNodeId(); cmRegConf->presidentVersion = getNodeInfo(getOwnNodeId()).m_version; cmRegConf->dynamicId = TdynId; c_clusterNodes.copyto(NdbNodeBitmask::Size, cmRegConf->allNdbNodes); sendSignal(Tblockref, GSN_CM_REGCONF, signal, CmRegConf::SignalLength, JBA); DEBUG_START(GSN_CM_REGCONF, refToNode(Tblockref), ""); /** * Send CmAdd to all nodes (including starting) */ c_start.m_nodes = c_clusterNodes; c_start.m_nodes.setWaitingFor(addNodePtr.i); c_start.m_gsn = GSN_CM_ADD; NodeReceiverGroup rg(QMGR, c_start.m_nodes); CmAdd * const cmAdd = (CmAdd*)signal->getDataPtrSend(); cmAdd->requestType = CmAdd::Prepare; cmAdd->startingNodeId = addNodePtr.i; cmAdd->startingVersion = startingVersion; sendSignal(rg, GSN_CM_ADD, signal, CmAdd::SignalLength, JBA); DEBUG_START2(GSN_CM_ADD, rg, "Prepare"); /** * Set timer */ return; signal->theData[0] = ZREGREQ_MASTER_TIMELIMIT; signal->theData[1] = c_start.m_startKey; sendSignalWithDelay(QMGR_REF, GSN_CONTINUEB, signal, 30000, 2); return;}//Qmgr::execCM_REGREQ()void Qmgr::sendCmRegrefLab(Signal* signal, BlockReference TBRef, CmRegRef::ErrorCode Terror) { CmRegRef* ref = (CmRegRef*)signal->getDataPtrSend(); ref->blockRef = reference(); ref->nodeId = getOwnNodeId(); ref->errorCode = Terror; ref->presidentCandidate = (cpresident == ZNIL ? c_start.m_president_candidate : cpresident); ref->candidate_latest_gci = c_start.m_president_candidate_gci; ref->latest_gci = c_start.m_latest_gci; ref->start_type = c_start.m_start_type; c_start.m_skip_nodes.copyto(NdbNodeBitmask::Size, ref->skip_nodes); sendSignal(TBRef, GSN_CM_REGREF, signal, CmRegRef::SignalLength, JBB); DEBUG_START(GSN_CM_REGREF, refToNode(TBRef), ""); return;}//Qmgr::sendCmRegrefLab()/*4.4.11 CM_REGCONF *//**-------------------------------------------------------------------------- * President gives permission to a node which wants to join the cluster. * The president will prepare the cluster that a new node will be added to * cluster. When the new node has set up all connections to the cluster, * the president will send commit to all clusternodes so the phase of the * new node can be changed to ZRUNNING. *--------------------------------------------------------------------------*//*******************************//* CM_REGCONF *//*******************************/void Qmgr::execCM_REGCONF(Signal* signal) { DEBUG_START3(signal, ""); NodeRecPtr myNodePtr; NodeRecPtr nodePtr; jamEntry(); const CmRegConf * const cmRegConf = (CmRegConf *)&signal->theData[0]; Uint32 presidentNodeId = cmRegConf->presidentNodeId; if (!ndbCompatible_ndb_ndb(NDB_VERSION, cmRegConf->presidentVersion)) { jam(); char buf[128]; BaseString::snprintf(buf,sizeof(buf), "incompatible version own=0x%x other=0x%x, " " shutting down", NDB_VERSION, cmRegConf->presidentVersion); systemErrorLab(signal, __LINE__, buf); return; } myNodePtr.i = getOwnNodeId(); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); ndbrequire(c_start.m_gsn == GSN_CM_REGREQ); ndbrequire(myNodePtr.p->phase = ZSTARTING); cpdistref = cmRegConf->presidentBlockRef; cpresident = cmRegConf->presidentNodeId; UintR TdynamicId = cmRegConf->dynamicId; c_maxDynamicId = TdynamicId; c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes); myNodePtr.p->ndynamicId = TdynamicId; /*--------------------------------------------------------------*/// Send this as an EVENT REPORT to inform about hearing about// other NDB node proclaiming to be president./*--------------------------------------------------------------*/ signal->theData[0] = NDB_LE_CM_REGCONF; signal->theData[1] = getOwnNodeId(); signal->theData[2] = cpresident; signal->theData[3] = TdynamicId; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) { jam(); if (c_clusterNodes.get(nodePtr.i)){ jam(); ptrAss(nodePtr, nodeRec); ndbrequire(nodePtr.p->phase == ZINIT); nodePtr.p->phase = ZRUNNING; if(c_connectedNodes.get(nodePtr.i)){ jam(); sendCmNodeInfoReq(signal, nodePtr.i, myNodePtr.p); } } } c_start.m_gsn = GSN_CM_NODEINFOREQ; c_start.m_nodes = c_clusterNodes; return;}//Qmgr::execCM_REGCONF()voidQmgr::check_readnodes_reply(Signal* signal, Uint32 nodeId, Uint32 gsn){ NodeRecPtr myNodePtr; myNodePtr.i = getOwnNodeId(); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); NodeRecPtr nodePtr; nodePtr.i = nodeId; ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -