📄 qmgrmain.cpp
字号:
}//if }//if }//for return;}//Qmgr::apiHbHandlingLab()void Qmgr::checkStartInterface(Signal* signal) { NodeRecPtr nodePtr; /*------------------------------------------------------------------------*/ // This method is called once per second. After a disconnect we wait at // least three seconds before allowing new connects. We will also ensure // that handling of the failure is completed before we allow new connections. /*------------------------------------------------------------------------*/ for (nodePtr.i = 1; nodePtr.i < MAX_NODES; nodePtr.i++) { ptrAss(nodePtr, nodeRec); if (nodePtr.p->phase == ZFAIL_CLOSING) { jam(); setNodeInfo(nodePtr.i).m_heartbeat_cnt++; if (c_connectedNodes.get(nodePtr.i)){ jam(); /*-------------------------------------------------------------------*/ // We need to ensure that the connection is not restored until it has // been disconnected for at least three seconds. /*-------------------------------------------------------------------*/ setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; }//if if ((getNodeInfo(nodePtr.i).m_heartbeat_cnt > 3) && (nodePtr.p->failState == NORMAL)) { /**------------------------------------------------------------------ * WE HAVE DISCONNECTED THREE SECONDS AGO. WE ARE NOW READY TO * CONNECT AGAIN AND ACCEPT NEW REGISTRATIONS FROM THIS NODE. * WE WILL NOT ALLOW CONNECTIONS OF API NODES UNTIL API FAIL HANDLING * IS COMPLETE. *-------------------------------------------------------------------*/ nodePtr.p->failState = NORMAL; if (getNodeInfo(nodePtr.i).m_type != NodeInfo::DB){ jam(); nodePtr.p->phase = ZAPI_INACTIVE; } else { jam(); nodePtr.p->phase = ZINIT; }//if setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA); } else { if(((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 60) == 0){ char buf[100]; BaseString::snprintf(buf, sizeof(buf), "Failure handling of node %d has not completed in %d min." " - state = %d", nodePtr.i, (getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1)/60, nodePtr.p->failState); warningEvent(buf); } } }//if }//for return;}//Qmgr::checkStartInterface()/**------------------------------------------------------------------------- * This method is called when a DISCONNECT_REP signal arrived which means that * the API node is gone and we want to release resources in TC/DICT blocks. *---------------------------------------------------------------------------*/void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo) { NodeRecPtr failedNodePtr; jamEntry(); failedNodePtr.i = failedNodeNo; signal->theData[0] = failedNodePtr.i; signal->theData[1] = QMGR_REF; ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec); ndbrequire(failedNodePtr.p->failState == NORMAL); failedNodePtr.p->failState = WAITING_FOR_FAILCONF1; sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA); /**------------------------------------------------------------------------- * THE OTHER NODE WAS AN API NODE. THE COMMUNICATION LINK IS ALREADY * BROKEN AND THUS NO ACTION IS NEEDED TO BREAK THE CONNECTION. * WE ONLY NEED TO SET PARAMETERS TO ENABLE A NEW CONNECTION IN A FEW * SECONDS. *-------------------------------------------------------------------------*/ setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0; CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0]; closeCom->xxxBlockRef = reference(); closeCom->failNo = 0; closeCom->noOfNodes = 1; NodeBitmask::clear(closeCom->theNodes); NodeBitmask::set(closeCom->theNodes, failedNodePtr.i); sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, CloseComReqConf::SignalLength, JBA);}//Qmgr::sendApiFailReq()void Qmgr::execAPI_FAILCONF(Signal* signal) { NodeRecPtr failedNodePtr; jamEntry(); failedNodePtr.i = signal->theData[0]; ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec); if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF1){ jam(); failedNodePtr.p->rcv[0] = signal->theData[1]; failedNodePtr.p->failState = WAITING_FOR_FAILCONF2; } else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF2) { failedNodePtr.p->rcv[1] = signal->theData[1]; failedNodePtr.p->failState = NORMAL; if (failedNodePtr.p->rcv[0] == failedNodePtr.p->rcv[1]) { jam(); systemErrorLab(signal, __LINE__); } else { jam(); failedNodePtr.p->rcv[0] = 0; failedNodePtr.p->rcv[1] = 0; }//if } else { jam();#ifdef VM_TRACE ndbout << "failedNodePtr.p->failState = " << (Uint32)(failedNodePtr.p->failState) << endl;#endif systemErrorLab(signal, __LINE__); }//if return;}//Qmgr::execAPI_FAILCONF()void Qmgr::execNDB_FAILCONF(Signal* signal) { NodeRecPtr failedNodePtr; NodeRecPtr nodePtr; jamEntry(); failedNodePtr.i = signal->theData[0]; if (ERROR_INSERTED(930)) { CLEAR_ERROR_INSERT_VALUE; infoEvent("Discarding NDB_FAILCONF for %u", failedNodePtr.i); return; } ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec); if (failedNodePtr.p->failState == WAITING_FOR_NDB_FAILCONF){ failedNodePtr.p->failState = NORMAL; } else { jam(); char buf[100]; BaseString::snprintf(buf, 100, "Received NDB_FAILCONF for node %u with state: %d %d", failedNodePtr.i, failedNodePtr.p->phase, failedNodePtr.p->failState); progError(__LINE__, 0, buf); systemErrorLab(signal, __LINE__); }//if if (cpresident == getOwnNodeId()) { jam(); /** * Prepare a NFCompleteRep and send to all connected API's * They can then abort all transaction waiting for response from * the failed node */ NFCompleteRep * const nfComp = (NFCompleteRep *)&signal->theData[0]; nfComp->blockNo = QMGR_REF; nfComp->nodeId = getOwnNodeId(); nfComp->failedNodeId = failedNodePtr.i; for (nodePtr.i = 1; nodePtr.i < MAX_NODES; nodePtr.i++) { jam(); ptrAss(nodePtr, nodeRec); if (nodePtr.p->phase == ZAPI_ACTIVE){ jam(); sendSignal(nodePtr.p->blockRef, GSN_NF_COMPLETEREP, signal, NFCompleteRep::SignalLength, JBA); }//if }//for } return;}//Qmgr::execNDB_FAILCONF()/*******************************//* DISCONNECT_REP *//*******************************/const char *lookupConnectionError(Uint32 err);void Qmgr::execDISCONNECT_REP(Signal* signal) { jamEntry(); const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; const Uint32 nodeId = rep->nodeId; const Uint32 err = rep->err; c_connectedNodes.clear(nodeId); NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrCheckGuard(nodePtr, MAX_NODES, nodeRec); switch(nodePtr.p->phase){ case ZRUNNING: jam(); break; case ZINIT: ndbrequire(false); case ZSTARTING: progError(__LINE__, NDBD_EXIT_CONNECTION_SETUP_FAILED, lookupConnectionError(err)); ndbrequire(false); case ZPREPARE_FAIL: ndbrequire(false); case ZFAIL_CLOSING: ndbrequire(false); case ZAPI_ACTIVE: ndbrequire(false); case ZAPI_INACTIVE: ndbrequire(false); } node_failed(signal, nodeId);}//DISCONNECT_REPvoid Qmgr::node_failed(Signal* signal, Uint16 aFailedNode) { NodeRecPtr failedNodePtr; /**------------------------------------------------------------------------ * A COMMUNICATION LINK HAS BEEN DISCONNECTED. WE MUST TAKE SOME ACTION * DUE TO THIS. *-----------------------------------------------------------------------*/ failedNodePtr.i = aFailedNode; ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec); if (getNodeInfo(failedNodePtr.i).getType() == NodeInfo::DB){ jam(); /**--------------------------------------------------------------------- * THE OTHER NODE IS AN NDB NODE, WE HANDLE IT AS IF A HEARTBEAT * FAILURE WAS DISCOVERED. *---------------------------------------------------------------------*/ switch(failedNodePtr.p->phase){ case ZRUNNING: jam(); failReportLab(signal, aFailedNode, FailRep::ZLINK_FAILURE); return; case ZFAIL_CLOSING: jam(); return; case ZSTARTING: c_start.reset(); // Fall-through default: jam(); /*---------------------------------------------------------------------*/ // The other node is still not in the cluster but disconnected. // We must restart communication in three seconds. /*---------------------------------------------------------------------*/ failedNodePtr.p->failState = NORMAL; failedNodePtr.p->phase = ZFAIL_CLOSING; setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0; CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0]; closeCom->xxxBlockRef = reference(); closeCom->failNo = 0; closeCom->noOfNodes = 1; NodeBitmask::clear(closeCom->theNodes); NodeBitmask::set(closeCom->theNodes, failedNodePtr.i); sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, CloseComReqConf::SignalLength, JBA); }//if return; } /** * API code */ jam(); if (failedNodePtr.p->phase != ZFAIL_CLOSING){ jam(); //------------------------------------------------------------------------- // The API was active and has now failed. We need to initiate API failure // handling. If the API had already failed then we can ignore this // discovery. //------------------------------------------------------------------------- failedNodePtr.p->phase = ZFAIL_CLOSING; sendApiFailReq(signal, aFailedNode); arbitRec.code = ArbitCode::ApiFail; handleArbitApiFail(signal, aFailedNode); }//if return;}//Qmgr::node_failed()/**-------------------------------------------------------------------------- * AN API NODE IS REGISTERING. IF FOR THE FIRST TIME WE WILL ENABLE * COMMUNICATION WITH ALL NDB BLOCKS. *---------------------------------------------------------------------------*//*******************************//* API_REGREQ *//*******************************/void Qmgr::execAPI_REGREQ(Signal* signal) { jamEntry(); ApiRegReq* req = (ApiRegReq*)signal->getDataPtr(); const Uint32 version = req->version; const BlockReference ref = req->ref; NodeRecPtr apiNodePtr; apiNodePtr.i = refToNode(ref); ptrCheckGuard(apiNodePtr, MAX_NODES, nodeRec); #if 0 ndbout_c("Qmgr::execAPI_REGREQ: Recd API_REGREQ (NodeId=%d)", apiNodePtr.i);#endif bool compatability_check; NodeInfo::NodeType type= getNodeInfo(apiNodePtr.i).getType(); switch(type){ case NodeInfo::API: compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version); break; case NodeInfo::MGM: compatability_check = ndbCompatible_ndb_mgmt(NDB_VERSION, version); break; case NodeInfo::REP: // compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version); // break; case NodeInfo::DB: case NodeInfo::INVALID: default: sendApiRegRef(signal, ref, ApiRegRef::WrongType); infoEvent("Invalid connection attempt with type %d", type); return; } if (!compatability_check) { jam(); char buf[NDB_VERSION_STRING_BUF_SZ]; infoEvent("Connection attempt from %s id=%d with %s " "incompatible with %s", type == NodeInfo::API ? "api or mysqld" : "management server", apiNodePtr.i, getVersionString(version,"",buf,sizeof(buf)), NDB_VERSION_STRING); apiNodePtr.p->phase = ZAPI_INACTIVE; sendApiRegRef(signal, ref, ApiRegRef::UnsupportedVersion); return; } setNodeInfo(apiNodePtr.i).m_version = version; setNodeInfo(apiNodePtr.i).m_heartbeat_cnt= 0; ApiRegConf * const apiRegConf = (ApiRegConf *)&signal->theData[0]; apiRegConf->qmgrRef = reference(); apiRegConf->apiHeartbeatFrequency = (chbApiDelay / 10); apiRegConf->version = NDB_VERSION; apiRegConf->nodeState = getNodeState(); { NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec); Uint32 dynamicId = nodePtr.p->ndynamicId; if(apiRegConf->nodeState.masterNodeId != getOwnNodeId()){ jam(); apiRegConf->nodeState.dynamicId = dynamicId; } else { apiRegConf->nodeState.dynamicId = -dynamicId; } } apiRegConf->nodeState.m_connected_nodes.assign(c_connectedNodes); sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB); if ((getNodeState().startLevel == NodeState::SL_STARTED || getNodeState().getSingleUserMode()) && apiNodePtr.p->phase == ZAPI_INACTIVE) { jam(); /**---------------------------------------------------------------------- * THE API NODE IS REGISTERING. WE WILL ACCEPT IT BY CHANGING STATE AND * SENDING A CONFIRM. *----------------------------------------------------------------------*/ apiNodePtr.p->phase = ZAPI_ACTIVE; apiNodePtr.p->blockRef = ref; signal->theData[0] = apiNodePtr.i; sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA); } return;}//Qmgr::execAPI_REGREQ()voidQmgr::execAPI_VERSION_REQ(Signal * signal) { jamEntry(); ApiVersionReq * const re
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -