📄 qmgrmain.cpp
字号:
goto start_report; } } ndbrequire(false);start_report: jam(); { Uint32 sz = NdbNodeBitmask::Size; signal->theData[0] = NDB_LE_StartReport; signal->theData[3] = sz; Uint32* ptr = signal->theData+4; c_definedNodes.copyto(sz, ptr); ptr += sz; c_start.m_starting_nodes.copyto(sz, ptr); ptr += sz; c_start.m_skip_nodes.copyto(sz, ptr); ptr += sz; report_mask.copyto(sz, ptr); ptr+= sz; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4+4*NdbNodeBitmask::Size, JBB); } return retVal; missing_nodegroup: jam(); char buf[100], mask1[100], mask2[100]; c_start.m_starting_nodes.getText(mask1); tmp.assign(c_start.m_starting_nodes); tmp.bitANDC(c_start.m_starting_nodes_w_log); tmp.getText(mask2); BaseString::snprintf(buf, sizeof(buf), "Unable to start missing node group! " " starting: %s (missing fs for: %s)", mask1, mask2); progError(__LINE__, NDBD_EXIT_SR_RESTARTCONFLICT, buf);}voidQmgr::electionWon(Signal* signal){ NodeRecPtr myNodePtr; cpresident = getOwnNodeId(); /* This node becomes president. */ myNodePtr.i = getOwnNodeId(); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); myNodePtr.p->phase = ZRUNNING; cpdistref = reference(); cneighbourl = ZNIL; cneighbourh = ZNIL; myNodePtr.p->ndynamicId = 1; c_maxDynamicId = 1; c_clusterNodes.clear(); c_clusterNodes.set(getOwnNodeId()); cpresidentAlive = ZTRUE; c_start_election_time = ~0; c_start.reset(); signal->theData[0] = NDB_LE_CM_REGCONF; signal->theData[1] = getOwnNodeId(); signal->theData[2] = cpresident; signal->theData[3] = 1; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); c_start.m_starting_nodes.clear(getOwnNodeId()); if (c_start.m_starting_nodes.isclear()) { jam(); sendSttorryLab(signal); }}/*4.4.11 CONTINUEB *//*--------------------------------------------------------------------------*//* *//*--------------------------------------------------------------------------*//****************************>---------------------------------------------*//* CONTINUEB > SENDER: Own block, Own node *//****************************>-------+INPUT : TCONTINUEB_TYPE *//*--------------------------------------------------------------*/void Qmgr::regreqTimeLimitLab(Signal* signal) { if(cpresident == ZNIL) { if (c_start.m_president_candidate == ZNIL) { jam(); c_start.m_president_candidate = getOwnNodeId(); } cmInfoconf010Lab(signal); }}//Qmgr::regreqTimelimitLab()/**--------------------------------------------------------------------------- * The new node will take care of giving information about own node and ask * all other nodes for nodeinfo. The new node will use CM_NODEINFOREQ for * that purpose. When the setup of connections to all running, the president * will send a commit to all running nodes + the new node * INPUT: NODE_PTR1, must be set as ZNIL if we don't enter CONNECT_NODES) * from signal CM_NODEINFOCONF. *---------------------------------------------------------------------------*//*******************************//* CM_NODEINFOCONF *//*******************************/void Qmgr::execCM_NODEINFOCONF(Signal* signal) { DEBUG_START3(signal, ""); jamEntry(); CmNodeInfoConf * const conf = (CmNodeInfoConf*)signal->getDataPtr(); const Uint32 nodeId = conf->nodeId; const Uint32 dynamicId = conf->dynamicId; const Uint32 version = conf->version; NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrAss(nodePtr, nodeRec); ndbrequire(nodePtr.p->phase == ZSTARTING); ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ); c_start.m_nodes.clearWaitingFor(nodeId); /** * Update node info */ NodeRecPtr replyNodePtr; replyNodePtr.i = nodeId; ptrCheckGuard(replyNodePtr, MAX_NDB_NODES, nodeRec); replyNodePtr.p->ndynamicId = dynamicId; replyNodePtr.p->blockRef = signal->getSendersBlockRef(); setNodeInfo(replyNodePtr.i).m_version = version; if(!c_start.m_nodes.done()){ jam(); return; } /**********************************************<*/ /* Send an ack. back to the president. */ /* CM_ACKADD */ /* The new node has been registered by all */ /* running nodes and has stored nodeinfo about */ /* all running nodes. The new node has to wait */ /* for CM_ADD (commit) from president to become */ /* a running node in the cluster. */ /**********************************************<*/ sendCmAckAdd(signal, getOwnNodeId(), CmAdd::Prepare); return;}//Qmgr::execCM_NODEINFOCONF()/**--------------------------------------------------------------------------- * A new node sends nodeinfo about himself. The new node asks for * corresponding nodeinfo back in the CM_NODEINFOCONF. *---------------------------------------------------------------------------*//*******************************//* CM_NODEINFOREQ *//*******************************/void Qmgr::execCM_NODEINFOREQ(Signal* signal) { jamEntry(); const Uint32 Tblockref = signal->getSendersBlockRef(); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrAss(nodePtr, nodeRec); if(nodePtr.p->phase != ZRUNNING){ jam(); signal->theData[0] = reference(); signal->theData[1] = getOwnNodeId(); signal->theData[2] = ZNOT_RUNNING; sendSignal(Tblockref, GSN_CM_NODEINFOREF, signal, 3, JBB); return; } NodeRecPtr addNodePtr; CmNodeInfoReq * const req = (CmNodeInfoReq*)signal->getDataPtr(); addNodePtr.i = req->nodeId; ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec); addNodePtr.p->ndynamicId = req->dynamicId; addNodePtr.p->blockRef = signal->getSendersBlockRef(); setNodeInfo(addNodePtr.i).m_version = req->version; c_maxDynamicId = req->dynamicId; cmAddPrepare(signal, addNodePtr, nodePtr.p);}//Qmgr::execCM_NODEINFOREQ()voidQmgr::cmAddPrepare(Signal* signal, NodeRecPtr nodePtr, const NodeRec * self){ jam(); switch(nodePtr.p->phase){ case ZINIT: jam(); nodePtr.p->phase = ZSTARTING; return; case ZFAIL_CLOSING: jam(); #if 1 warningEvent("Recieved request to incorperate node %u, " "while error handling has not yet completed", nodePtr.i); ndbrequire(getOwnNodeId() != cpresident); ndbrequire(signal->header.theVerId_signalNumber == GSN_CM_ADD); c_start.m_nodes.clearWaitingFor(); c_start.m_nodes.setWaitingFor(nodePtr.i); c_start.m_gsn = GSN_CM_NODEINFOCONF;#else warningEvent("Enabling communication to CM_ADD node %u state=%d", nodePtr.i, nodePtr.p->phase); nodePtr.p->phase = ZSTARTING; nodePtr.p->failState = NORMAL; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA);#endif return; case ZSTARTING: break; case ZRUNNING: case ZPREPARE_FAIL: case ZAPI_ACTIVE: case ZAPI_INACTIVE: ndbrequire(false); } sendCmAckAdd(signal, nodePtr.i, CmAdd::Prepare); /* President have prepared us */ CmNodeInfoConf * conf = (CmNodeInfoConf*)signal->getDataPtrSend(); conf->nodeId = getOwnNodeId(); conf->dynamicId = self->ndynamicId; conf->version = getNodeInfo(getOwnNodeId()).m_version; sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal, CmNodeInfoConf::SignalLength, JBB); DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), "");}voidQmgr::sendCmAckAdd(Signal * signal, Uint32 nodeId, CmAdd::RequestType type){ CmAckAdd * cmAckAdd = (CmAckAdd*)signal->getDataPtrSend(); cmAckAdd->requestType = type; cmAckAdd->startingNodeId = nodeId; cmAckAdd->senderNodeId = getOwnNodeId(); sendSignal(cpdistref, GSN_CM_ACKADD, signal, CmAckAdd::SignalLength, JBA); DEBUG_START(GSN_CM_ACKADD, cpresident, ""); switch(type){ case CmAdd::Prepare: return; case CmAdd::AddCommit: case CmAdd::CommitNew: break; } signal->theData[0] = nodeId; EXECUTE_DIRECT(NDBCNTR, GSN_CM_ADD_REP, signal, 1); jamEntry();}/*4.4.11 CM_ADD *//**-------------------------------------------------------------------------- * Prepare a running node to add a new node to the cluster. The running node * will change phase of the new node fron ZINIT to ZWAITING. The running node * will also mark that we have received a prepare. When the new node has sent * us nodeinfo we can send an acknowledgement back to the president. When all * running nodes has acknowledged the new node, the president will send a * commit and we can change phase of the new node to ZRUNNING. The president * will also send CM_ADD to himself. *---------------------------------------------------------------------------*//*******************************//* CM_ADD *//*******************************/void Qmgr::execCM_ADD(Signal* signal) { NodeRecPtr addNodePtr; jamEntry(); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec); CmAdd * const cmAdd = (CmAdd*)signal->getDataPtr(); const CmAdd::RequestType type = (CmAdd::RequestType)cmAdd->requestType; addNodePtr.i = cmAdd->startingNodeId; //const Uint32 startingVersion = cmAdd->startingVersion; ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec); DEBUG_START3(signal, type); if(nodePtr.p->phase == ZSTARTING){ jam(); /** * We are joining... */ ndbrequire(addNodePtr.i == nodePtr.i); switch(type){ case CmAdd::Prepare: ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ); /** * Wait for CM_NODEINFO_CONF */ return; case CmAdd::CommitNew: /** * Tata. we're in the cluster */ joinedCluster(signal, addNodePtr); return; case CmAdd::AddCommit: ndbrequire(false); } } switch (type) { case CmAdd::Prepare: cmAddPrepare(signal, addNodePtr, nodePtr.p); break; case CmAdd::AddCommit:{ jam(); ndbrequire(addNodePtr.p->phase == ZSTARTING); addNodePtr.p->phase = ZRUNNING; setNodeInfo(addNodePtr.i).m_heartbeat_cnt= 0; c_clusterNodes.set(addNodePtr.i); findNeighbours(signal); /** * SEND A HEARTBEAT IMMEDIATELY TO DECREASE THE RISK THAT WE MISS EARLY * HEARTBEATS. */ sendHeartbeat(signal); /** * ENABLE COMMUNICATION WITH ALL BLOCKS WITH THE NEWLY ADDED NODE */ signal->theData[0] = addNodePtr.i; sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA); sendCmAckAdd(signal, addNodePtr.i, CmAdd::AddCommit); if(getOwnNodeId() != cpresident){ jam(); c_start.reset(); } break; } case CmAdd::CommitNew: jam(); ndbrequire(false); }}//Qmgr::execCM_ADD()voidQmgr::joinedCluster(Signal* signal, NodeRecPtr nodePtr){ /** * WE HAVE BEEN INCLUDED IN THE CLUSTER WE CAN START BEING PART OF THE * HEARTBEAT PROTOCOL AND WE WILL ALSO ENABLE COMMUNICATION WITH ALL * NODES IN THE CLUSTER. */ nodePtr.p->phase = ZRUNNING; setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; findNeighbours(signal); c_clusterNodes.set(nodePtr.i); c_start.reset(); /** * SEND A HEARTBEAT IMMEDIATELY TO DECREASE THE RISK * THAT WE MISS EARLY HEARTBEATS. */ sendHeartbeat(signal); /** * ENABLE COMMUNICATION WITH ALL BLOCKS IN THE CURRENT CLUSTER AND SET * THE NODES IN THE CLUSTER TO BE RUNNING. */ for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) { jam(); ptrAss(nodePtr, nodeRec); if ((nodePtr.p->phase == ZRUNNING) && (nodePtr.i != getOwnNodeId())) { /*-------------------------------------------------------------------*/ // Enable full communication to all other nodes. Not really necessary // to open communication to ourself. /*-------------------------------------------------------------------*/ jam(); signal->theData[0] = nodePtr.i; sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA); }//if }//for sendSttorryLab(signal); /** * Start timer handling */ signal->theData[0] = ZTIMER_HANDLING; sendSignal(QMGR_REF, GSN_CONTINUEB, signal, 10, JBB); sendCmAckAdd(signal, getOwnNodeId(), CmAdd::CommitNew);}/* 4.10.7 CM_ACKADD - PRESIDENT IS RECEIVER - *//*---------------------------------------------------------------------------*//* Entry point for an ack add signal. * The TTYPE defines if it is a prepare or a commit. *//*---------------------------------------------------------------------------*/void Qmgr::execCM_ACKADD(Signal* signal) { NodeRecPtr addNodePtr; NodeRecPtr senderNodePtr; jamEntry(); CmAckAdd * const cmAckAdd = (CmAckAdd*)signal->getDataPtr(); const CmAdd::RequestType type = (CmAdd::RequestType)cmAckAdd->requestType; addNodePtr.i = cmAckAdd->startingNodeId; senderNodePtr.i = cmAckAdd->senderNodeId; DEBUG_START3(signal, type); if (cpresident != getOwnNodeId()) { jam(); /*-----------------------------------------------------------------------*/ /* IF WE ARE NOT PRESIDENT THEN WE SHOULD NOT RECEIVE THIS MESSAGE. */ /*------------------------------------------------------------_----------*/ warningEvent("Received CM_ACKADD from %d president=%d", senderNodePtr.i, cpresident); return; }//if if (addNodePtr.i != c_start.m_startNode) { jam(); /*----------------------------------------------------------------------*/ /* THIS IS NOT THE STARTING NODE. WE ARE ACTIVE NOW WITH ANOTHER START. */ /*----------------------------------------------------------------------*/ warningEvent("Received CM_ACKADD from %d with startNode=%d != own %d", senderNodePtr.i, addNodePtr.i, c_start.m_startNode);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -