📄 qmgrmain.cpp
字号:
return; }//if ndbrequire(c_start.m_gsn == GSN_CM_ADD); c_start.m_nodes.clearWaitingFor(senderNodePtr.i); if(!c_start.m_nodes.done()){ jam(); return; } switch (type) { case CmAdd::Prepare:{ jam(); /*----------------------------------------------------------------------*/ /* ALL RUNNING NODES HAVE PREPARED THE INCLUSION OF THIS NEW NODE. */ /*----------------------------------------------------------------------*/ c_start.m_gsn = GSN_CM_ADD; c_start.m_nodes = c_clusterNodes; CmAdd * const cmAdd = (CmAdd*)signal->getDataPtrSend(); cmAdd->requestType = CmAdd::AddCommit; cmAdd->startingNodeId = addNodePtr.i; cmAdd->startingVersion = getNodeInfo(addNodePtr.i).m_version; NodeReceiverGroup rg(QMGR, c_clusterNodes); sendSignal(rg, GSN_CM_ADD, signal, CmAdd::SignalLength, JBA); DEBUG_START2(GSN_CM_ADD, rg, "AddCommit"); return; } case CmAdd::AddCommit:{ jam(); /****************************************/ /* Send commit to the new node so he */ /* will change PHASE into ZRUNNING */ /****************************************/ c_start.m_gsn = GSN_CM_ADD; c_start.m_nodes.clearWaitingFor(); c_start.m_nodes.setWaitingFor(addNodePtr.i); CmAdd * const cmAdd = (CmAdd*)signal->getDataPtrSend(); cmAdd->requestType = CmAdd::CommitNew; cmAdd->startingNodeId = addNodePtr.i; cmAdd->startingVersion = getNodeInfo(addNodePtr.i).m_version; sendSignal(calcQmgrBlockRef(addNodePtr.i), GSN_CM_ADD, signal, CmAdd::SignalLength, JBA); DEBUG_START(GSN_CM_ADD, addNodePtr.i, "CommitNew"); return; } case CmAdd::CommitNew: jam(); /** * Tell arbitration about new node. */ handleArbitNdbAdd(signal, addNodePtr.i); c_start.reset(); if (c_start.m_starting_nodes.get(addNodePtr.i)) { jam(); c_start.m_starting_nodes.clear(addNodePtr.i); if (c_start.m_starting_nodes.isclear()) { jam(); sendSttorryLab(signal); } } return; }//switch ndbrequire(false);}//Qmgr::execCM_ACKADD()/**------------------------------------------------------------------------- * WE HAVE BEEN INCLUDED INTO THE CLUSTER. IT IS NOW TIME TO CALCULATE WHICH * ARE OUR LEFT AND RIGHT NEIGHBOURS FOR THE HEARTBEAT PROTOCOL. *--------------------------------------------------------------------------*/void Qmgr::findNeighbours(Signal* signal) { UintR toldLeftNeighbour; UintR tfnLeftFound; UintR tfnMaxFound; UintR tfnMinFound; UintR tfnRightFound; NodeRecPtr fnNodePtr; NodeRecPtr fnOwnNodePtr; toldLeftNeighbour = cneighbourl; tfnLeftFound = 0; tfnMaxFound = 0; tfnMinFound = (UintR)-1; tfnRightFound = (UintR)-1; fnOwnNodePtr.i = getOwnNodeId(); ptrCheckGuard(fnOwnNodePtr, MAX_NDB_NODES, nodeRec); for (fnNodePtr.i = 1; fnNodePtr.i < MAX_NDB_NODES; fnNodePtr.i++) { jam(); ptrAss(fnNodePtr, nodeRec); if (fnNodePtr.i != fnOwnNodePtr.i) { if (fnNodePtr.p->phase == ZRUNNING) { if (tfnMinFound > fnNodePtr.p->ndynamicId) { jam(); tfnMinFound = fnNodePtr.p->ndynamicId; }//if if (tfnMaxFound < fnNodePtr.p->ndynamicId) { jam(); tfnMaxFound = fnNodePtr.p->ndynamicId; }//if if (fnOwnNodePtr.p->ndynamicId > fnNodePtr.p->ndynamicId) { jam(); if (fnNodePtr.p->ndynamicId > tfnLeftFound) { jam(); tfnLeftFound = fnNodePtr.p->ndynamicId; }//if } else { jam(); if (fnNodePtr.p->ndynamicId < tfnRightFound) { jam(); tfnRightFound = fnNodePtr.p->ndynamicId; }//if }//if }//if }//if }//for if (tfnLeftFound == 0) { if (tfnMinFound == (UintR)-1) { jam(); cneighbourl = ZNIL; } else { jam(); cneighbourl = translateDynamicIdToNodeId(signal, tfnMaxFound); }//if } else { jam(); cneighbourl = translateDynamicIdToNodeId(signal, tfnLeftFound); }//if if (tfnRightFound == (UintR)-1) { if (tfnMaxFound == 0) { jam(); cneighbourh = ZNIL; } else { jam(); cneighbourh = translateDynamicIdToNodeId(signal, tfnMinFound); }//if } else { jam(); cneighbourh = translateDynamicIdToNodeId(signal, tfnRightFound); }//if if (toldLeftNeighbour != cneighbourl) { jam(); if (cneighbourl != ZNIL) { jam(); /**-------------------------------------------------------------------*/ /* WE ARE SUPERVISING A NEW LEFT NEIGHBOUR. WE START WITH ALARM COUNT * EQUAL TO ZERO. *---------------------------------------------------------------------*/ fnNodePtr.i = cneighbourl; ptrCheckGuard(fnNodePtr, MAX_NDB_NODES, nodeRec); setNodeInfo(fnNodePtr.i).m_heartbeat_cnt= 0; }//if }//if signal->theData[0] = NDB_LE_FIND_NEIGHBOURS; signal->theData[1] = getOwnNodeId(); signal->theData[2] = cneighbourl; signal->theData[3] = cneighbourh; signal->theData[4] = fnOwnNodePtr.p->ndynamicId; UintR Tlen = 5; for (fnNodePtr.i = 1; fnNodePtr.i < MAX_NDB_NODES; fnNodePtr.i++) { jam(); ptrAss(fnNodePtr, nodeRec); if (fnNodePtr.i != fnOwnNodePtr.i) { if (fnNodePtr.p->phase == ZRUNNING) { jam(); signal->theData[Tlen] = fnNodePtr.i; signal->theData[Tlen + 1] = fnNodePtr.p->ndynamicId; if (Tlen < 25) { /*----------------------------------------------------------------*/ // This code can only report 11 nodes. // We need to update this when increasing the number of nodes // supported. /*-----------------------------------------------------------------*/ Tlen += 2; } }//if }//if }//for sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, Tlen, JBB);}//Qmgr::findNeighbours()/*4.10.7 INIT_DATA *//*---------------------------------------------------------------------------*//*---------------------------------------------------------------------------*/void Qmgr::initData(Signal* signal) { NodeRecPtr nodePtr; for (nodePtr.i = 1; nodePtr.i < MAX_NODES; nodePtr.i++) { ptrAss(nodePtr, nodeRec); nodePtr.p->ndynamicId = 0; if(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB){ nodePtr.p->phase = ZINIT; c_definedNodes.set(nodePtr.i); } else { nodePtr.p->phase = ZAPI_INACTIVE; } setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; nodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE; nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE; nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE; nodePtr.p->failState = NORMAL; nodePtr.p->rcv[0] = 0; nodePtr.p->rcv[1] = 0; }//for cfailureNr = 1; ccommitFailureNr = 1; cprepareFailureNr = 1; cnoFailedNodes = 0; cnoPrepFailedNodes = 0; creadyDistCom = ZFALSE; cpresident = ZNIL; c_start.m_president_candidate = ZNIL; c_start.m_president_candidate_gci = 0; cpdistref = 0; cneighbourh = ZNIL; cneighbourl = ZNIL; cdelayRegreq = ZDELAY_REGREQ; cactivateApiCheck = 0; ctoStatus = Q_NOT_ACTIVE; interface_check_timer.setDelay(1000); interface_check_timer.reset(); clatestTransactionCheck = 0; cLqhTimeSignalCount = 0; // catch-all for missing initializations memset(&arbitRec, 0, sizeof(arbitRec)); /** * Timeouts */ const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); Uint32 hbDBDB = 1500; Uint32 hbDBAPI = 1500; Uint32 arbitTimeout = 1000; c_restartPartialTimeout = 30000; c_restartPartionedTimeout = 60000; c_restartFailureTimeout = ~0; ndb_mgm_get_int_parameter(p, CFG_DB_HEARTBEAT_INTERVAL, &hbDBDB); ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL, &hbDBAPI); ndb_mgm_get_int_parameter(p, CFG_DB_ARBIT_TIMEOUT, &arbitTimeout); ndb_mgm_get_int_parameter(p, CFG_DB_START_PARTIAL_TIMEOUT, &c_restartPartialTimeout); ndb_mgm_get_int_parameter(p, CFG_DB_START_PARTITION_TIMEOUT, &c_restartPartionedTimeout); ndb_mgm_get_int_parameter(p, CFG_DB_START_FAILURE_TIMEOUT, &c_restartFailureTimeout); if(c_restartPartialTimeout == 0) { c_restartPartialTimeout = ~0; } if (c_restartPartionedTimeout ==0) { c_restartPartionedTimeout = ~0; } if (c_restartFailureTimeout == 0) { c_restartFailureTimeout = ~0; } setHbDelay(hbDBDB); setHbApiDelay(hbDBAPI); setArbitTimeout(arbitTimeout); arbitRec.state = ARBIT_NULL; // start state for all nodes arbitRec.apiMask[0].clear(); // prepare for ARBIT_CFG ArbitSignalData* const sd = (ArbitSignalData*)&signal->theData[0]; for (unsigned rank = 1; rank <= 2; rank++) { sd->sender = getOwnNodeId(); sd->code = rank; sd->node = 0; sd->ticket.clear(); sd->mask.clear(); ndb_mgm_configuration_iterator * iter = theConfiguration.getClusterConfigIterator(); for (ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)) { Uint32 tmp = 0; if (ndb_mgm_get_int_parameter(iter, CFG_NODE_ARBIT_RANK, &tmp) == 0 && tmp == rank){ Uint32 nodeId = 0; ndbrequire(!ndb_mgm_get_int_parameter(iter, CFG_NODE_ID, &nodeId)); sd->mask.set(nodeId); } } execARBIT_CFG(signal); } setNodeInfo(getOwnNodeId()).m_version = NDB_VERSION;}//Qmgr::initData()/**--------------------------------------------------------------------------- * HERE WE RECEIVE THE JOB TABLE SIGNAL EVERY 10 MILLISECONDS. * WE WILL USE THIS TO CHECK IF IT IS TIME TO CHECK THE NEIGHBOUR NODE. * WE WILL ALSO SEND A SIGNAL TO BLOCKS THAT NEED A TIME SIGNAL AND * DO NOT WANT TO USE JOB TABLE SIGNALS. *---------------------------------------------------------------------------*/void Qmgr::timerHandlingLab(Signal* signal) { NDB_TICKS TcurrentTime = NdbTick_CurrentMillisecond(); NodeRecPtr myNodePtr; myNodePtr.i = getOwnNodeId(); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); if (myNodePtr.p->phase == ZRUNNING) { jam(); /**--------------------------------------------------------------------- * WE ARE ONLY PART OF HEARTBEAT CLUSTER IF WE ARE UP AND RUNNING. *---------------------------------------------------------------------*/ if (hb_send_timer.check(TcurrentTime)) { jam(); sendHeartbeat(signal); hb_send_timer.reset(); } if (hb_check_timer.check(TcurrentTime)) { jam(); checkHeartbeat(signal); hb_check_timer.reset(); } } if (interface_check_timer.check(TcurrentTime)) { jam(); interface_check_timer.reset(); checkStartInterface(signal); } if (cactivateApiCheck != 0) { jam(); if (hb_api_timer.check(TcurrentTime)) { jam(); hb_api_timer.reset(); apiHbHandlingLab(signal); }//if if (clatestTransactionCheck == 0) { //------------------------------------------------------------- // Initialise the Transaction check timer. //------------------------------------------------------------- clatestTransactionCheck = TcurrentTime; }//if int counter = 0; while (TcurrentTime > ((NDB_TICKS)10 + clatestTransactionCheck)) { jam(); clatestTransactionCheck += (NDB_TICKS)10; sendSignal(DBTC_REF, GSN_TIME_SIGNAL, signal, 1, JBB); cLqhTimeSignalCount++; if (cLqhTimeSignalCount >= 100) { cLqhTimeSignalCount = 0; sendSignal(DBLQH_REF, GSN_TIME_SIGNAL, signal, 1, JBB); }//if counter++; if (counter > 1) { jam(); break; } else { ; }//if }//while }//if //-------------------------------------------------- // Resend this signal with 10 milliseconds delay. //-------------------------------------------------- signal->theData[0] = ZTIMER_HANDLING; sendSignalWithDelay(QMGR_REF, GSN_CONTINUEB, signal, 10, 1); return;}//Qmgr::timerHandlingLab()/*---------------------------------------------------------------------------*//* THIS MODULE HANDLES THE SENDING AND RECEIVING OF HEARTBEATS. *//*---------------------------------------------------------------------------*/void Qmgr::sendHeartbeat(Signal* signal) { NodeRecPtr localNodePtr; localNodePtr.i = cneighbourh; if (localNodePtr.i == ZNIL) { jam(); /**--------------------------------------------------------------------- * THERE ARE NO NEIGHBOURS. THIS IS POSSIBLE IF WE ARE THE ONLY NODE IN * THE CLUSTER.IN THIS CASE WE DO NOT NEED TO SEND ANY HEARTBEAT SIGNALS. *-----------------------------------------------------------------------*/ return; }//if ptrCheckGuard(localNodePtr, MAX_NDB_NODES, nodeRec); signal->theData[0] = getOwnNodeId(); sendSignal(localNodePtr.p->blockRef, GSN_CM_HEARTBEAT, signal, 1, JBA);#ifdef VM_TRACE signal->theData[0] = NDB_LE_SentHeartbeat; signal->theData[1] = localNodePtr.i; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); #endif}//Qmgr::sendHeartbeat()void Qmgr::checkHeartbeat(Signal* signal) { NodeRecPtr nodePtr; nodePtr.i = cneighbourl; if (nodePtr.i == ZNIL) { jam(); /**--------------------------------------------------------------------- * THERE ARE NO NEIGHBOURS. THIS IS POSSIBLE IF WE ARE THE ONLY NODE IN * THE CLUSTER. IN THIS CASE WE DO NOT NEED TO CHECK ANY HEARTBEATS. *-----------------------------------------------------------------------*/ return; }//if ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec); setNodeInfo(nodePtr.i).m_heartbeat_cnt++; ndbrequire(nodePtr.p->phase == ZRUNNING); ndbrequire(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB); if(getNodeInfo(nodePtr.i).m_heartbeat_cnt > 2){ signal->theData[0] = NDB_LE_MissedHeartbeat; signal->theData[1] = nodePtr.i; signal->theData[2] = getNodeInfo(nodePtr.i).m_heartbeat_cnt - 1; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB); } if (getNode
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -