📄 clustermgr.cpp
字号:
theNode.m_info.m_heartbeat_cnt = 0; theNode.hbCounter = 0; /** * make sure the node itself is marked connected even * if first API_REGCONF has not arrived */ theNode.m_state.m_connected_nodes.set(nodeId); if (theNode.m_info.m_type != NodeInfo::REP) { theNode.hbFrequency = 0; } theNode.m_info.m_version = 0; theNode.compatible = true; theNode.nfCompleteRep = true; theFacade.ReportNodeAlive(nodeId);}voidClusterMgr::reportDisconnected(NodeId nodeId){ assert(nodeId > 0 && nodeId < MAX_NODES); assert(noOfConnectedNodes > 0); noOfConnectedNodes--; theNodes[nodeId].connected = false; theNodes[nodeId].m_state.m_connected_nodes.clear(); reportNodeFailed(nodeId);}voidClusterMgr::reportNodeFailed(NodeId nodeId){ Node & theNode = theNodes[nodeId]; set_node_alive(theNode, false); theNode.m_info.m_connectCount ++; if(theNode.connected) { theFacade.doDisconnect(nodeId); } const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING); theNode.m_state.startLevel = NodeState::SL_NOTHING; if(report) { theFacade.ReportNodeDead(nodeId); } theNode.nfCompleteRep = false; if(noOfAliveNodes == 0) { NFCompleteRep rep; for(Uint32 i = 1; i<MAX_NODES; i++){ if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){ rep.failedNodeId = i; execNF_COMPLETEREP((Uint32*)&rep); } } }}/****************************************************************************** * Arbitrator ******************************************************************************/ArbitMgr::ArbitMgr(TransporterFacade & _fac) : theFacade(_fac){ DBUG_ENTER("ArbitMgr::ArbitMgr"); theThreadMutex = NdbMutex_Create(); theInputCond = NdbCondition_Create(); theInputMutex = NdbMutex_Create(); theRank = 0; theDelay = 0; theThread = 0; theInputTimeout = 0; theInputFull = false; memset(&theInputFull, 0, sizeof(theInputFull)); theState = StateInit; memset(&theStartReq, 0, sizeof(theStartReq)); memset(&theChooseReq1, 0, sizeof(theChooseReq1)); memset(&theChooseReq2, 0, sizeof(theChooseReq2)); memset(&theStopOrd, 0, sizeof(theStopOrd)); DBUG_VOID_RETURN;}ArbitMgr::~ArbitMgr(){ DBUG_ENTER("ArbitMgr::~ArbitMgr"); NdbMutex_Destroy(theThreadMutex); NdbCondition_Destroy(theInputCond); NdbMutex_Destroy(theInputMutex); DBUG_VOID_RETURN;}// Start arbitrator thread. This is kernel request.// First stop any previous thread since it is a left-over// which was never used and which now has wrong ticket.voidArbitMgr::doStart(const Uint32* theData){ ArbitSignal aSignal; NdbMutex_Lock(theThreadMutex); if (theThread != NULL) { aSignal.init(GSN_ARBIT_STOPORD, NULL); aSignal.data.code = StopRestart; sendSignalToThread(aSignal); void* value; NdbThread_WaitFor(theThread, &value); NdbThread_Destroy(&theThread); theState = StateInit; theInputFull = false; } aSignal.init(GSN_ARBIT_STARTREQ, theData); sendSignalToThread(aSignal); theThread = NdbThread_Create( runArbitMgr_C, (void**)this, 32768, "ndb_arbitmgr", NDB_THREAD_PRIO_HIGH); NdbMutex_Unlock(theThreadMutex);}// The "choose me" signal from a candidate.voidArbitMgr::doChoose(const Uint32* theData){ ArbitSignal aSignal; aSignal.init(GSN_ARBIT_CHOOSEREQ, theData); sendSignalToThread(aSignal);}// Stop arbitrator thread via stop signal from the kernel// or when exiting API program.voidArbitMgr::doStop(const Uint32* theData){ DBUG_ENTER("ArbitMgr::doStop"); ArbitSignal aSignal; NdbMutex_Lock(theThreadMutex); if (theThread != NULL) { aSignal.init(GSN_ARBIT_STOPORD, theData); if (theData == 0) { aSignal.data.code = StopExit; } else { aSignal.data.code = StopRequest; } sendSignalToThread(aSignal); void* value; NdbThread_WaitFor(theThread, &value); NdbThread_Destroy(&theThread); theState = StateInit; } NdbMutex_Unlock(theThreadMutex); DBUG_VOID_RETURN;}// private methodsextern "C" void*runArbitMgr_C(void* me){ ((ArbitMgr*) me)->threadMain(); return NULL;}voidArbitMgr::sendSignalToThread(ArbitSignal& aSignal){#ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit recv: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " send=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl;#endif aSignal.setTimestamp(); // signal arrival time NdbMutex_Lock(theInputMutex); while (theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000); } theInputBuffer = aSignal; theInputFull = true; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex);}voidArbitMgr::threadMain(){ ArbitSignal aSignal; aSignal = theInputBuffer; threadStart(aSignal); bool stop = false; while (! stop) { NdbMutex_Lock(theInputMutex); while (! theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout); threadTimeout(); } aSignal = theInputBuffer; theInputFull = false; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); switch (aSignal.gsn) { case GSN_ARBIT_CHOOSEREQ: threadChoose(aSignal); break; case GSN_ARBIT_STOPORD: stop = true; break; } } threadStop(aSignal);}// handle events in the threadvoidArbitMgr::threadStart(ArbitSignal& aSignal){ theStartReq = aSignal; sendStartConf(theStartReq, ArbitCode::ApiStart); theState = StateStarted; theInputTimeout = 1000;}voidArbitMgr::threadChoose(ArbitSignal& aSignal){ switch (theState) { case StateStarted: // first REQ if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq1 = aSignal; if (theDelay == 0) { sendChooseConf(aSignal, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; } theState = StateChoose1; theInputTimeout = 1; return; case StateChoose1: // second REQ within Delay if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq2 = aSignal; theState = StateChoose2; theInputTimeout = 1; return; case StateChoose2: // too many REQs - refuse all if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } sendChooseRef(theChooseReq1, ArbitCode::ErrToomany); sendChooseRef(theChooseReq2, ArbitCode::ErrToomany); sendChooseRef(aSignal, ArbitCode::ErrToomany); theState = StateFinished; theInputTimeout = 1000; return; default: sendChooseRef(aSignal, ArbitCode::ErrState); break; }}voidArbitMgr::threadTimeout(){ switch (theState) { case StateStarted: break; case StateChoose1: if (theChooseReq1.getTimediff() < theDelay) break; sendChooseConf(theChooseReq1, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); theState = StateFinished; theInputTimeout = 1000; break; default: break; }}voidArbitMgr::threadStop(ArbitSignal& aSignal){ switch (aSignal.data.code) { case StopExit: switch (theState) { case StateStarted: sendStopRep(theStartReq, 0); break; case StateChoose1: // just in time sendChooseConf(theChooseReq1, ArbitCode::WinChoose); break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); break; case StateInit: case StateFinished: //?? break; } break; case StopRequest: break; case StopRestart: break; }}// output routinesvoidArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code){ ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STARTCONF; copySignal.data.code = code; sendSignalToQmgr(copySignal);}voidArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code){ ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSECONF; copySignal.data.code = code; sendSignalToQmgr(copySignal);}voidArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code){ ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSEREF; copySignal.data.code = code; sendSignalToQmgr(copySignal);}voidArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code){ ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STOPREP; copySignal.data.code = code; sendSignalToQmgr(copySignal);}/** * Send signal to QMGR. The input includes signal number and * signal data. The signal data is normally a copy of a received * signal so it contains expected arbitrator node id and ticket. * The sender in signal data is the QMGR node id. */voidArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = aSignal.gsn; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ArbitSignalData::SignalLength; ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend()); sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId()); sd->code = aSignal.data.code; sd->node = aSignal.data.node; sd->ticket = aSignal.data.ticket; sd->mask = aSignal.data.mask;#ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit send: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " recv=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl;#endif theFacade.lock_mutex(); theFacade.sendSignalUnCond(&signal, aSignal.data.sender); theFacade.unlock_mutex();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -