📄 mgmtsrvr.cpp
字号:
case NODE_TYPE_MGM: nodeTypes[id] = NDB_MGM_NODE_TYPE_MGM; break; case NODE_TYPE_REP: nodeTypes[id] = NDB_MGM_NODE_TYPE_REP; break; case NODE_TYPE_EXT_REP: default: break; } } } _props = NULL; BaseString error_string; if ((m_node_id_mutex = NdbMutex_Create()) == 0) { ndbout << "mutex creation failed line = " << __LINE__ << endl; require(false); } if (_ownNodeId == 0) // we did not get node id from other server { NodeId tmp= m_config_retriever->get_configuration_nodeid(); if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, 0, 0, error_string)){ ndbout << "Unable to obtain requested nodeid: " << error_string.c_str() << endl; require(false); } _ownNodeId = tmp; } { DBUG_PRINT("info", ("verifyConfig")); if (!m_config_retriever->verifyConfig(_config->m_configValues, _ownNodeId)) { ndbout << m_config_retriever->getErrorString() << endl; require(false); } } // Setup clusterlog as client[0] in m_event_listner { Ndb_mgmd_event_service::Event_listener se; se.m_socket = NDB_INVALID_SOCKET; for(size_t t = 0; t<LogLevel::LOGLEVEL_CATEGORIES; t++){ se.m_logLevel.setLogLevel((LogLevel::EventCategory)t, 7); } se.m_logLevel.setLogLevel(LogLevel::llError, 15); se.m_logLevel.setLogLevel(LogLevel::llConnection, 8); se.m_logLevel.setLogLevel(LogLevel::llBackup, 15); m_event_listner.m_clients.push_back(se); m_event_listner.m_logLevel = se.m_logLevel; } DBUG_VOID_RETURN;}//****************************************************************************//****************************************************************************bool MgmtSrvr::check_start() { if (_config == 0) { DEBUG("MgmtSrvr.cpp: _config is NULL."); return false; } return true;}bool MgmtSrvr::start(BaseString &error_string){ int mgm_connect_result; DBUG_ENTER("MgmtSrvr::start"); if (_props == NULL) { if (!check_start()) { error_string.append("MgmtSrvr.cpp: check_start() failed."); DBUG_RETURN(false); } } theFacade= TransporterFacade::theFacadeInstance = new TransporterFacade(); if(theFacade == 0) { DEBUG("MgmtSrvr.cpp: theFacade is NULL."); error_string.append("MgmtSrvr.cpp: theFacade is NULL."); DBUG_RETURN(false); } if ( theFacade->start_instance (_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) { DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0."); DBUG_RETURN(false); } MGM_REQUIRE(_blockNumber == 1); // Register ourself at TransporterFacade to be able to receive signals // and to be notified when a database process has died. _blockNumber = theFacade->open(this, signalReceivedNotification, nodeStatusNotification); if(_blockNumber == -1){ DEBUG("MgmtSrvr.cpp: _blockNumber is -1."); error_string.append("MgmtSrvr.cpp: _blockNumber is -1."); theFacade->stop_instance(); theFacade = 0; DBUG_RETURN(false); } if((mgm_connect_result= connect_to_self()) < 0) { ndbout_c("Unable to connect to our own ndb_mgmd (Error %d)", mgm_connect_result); ndbout_c("This is probably a bug."); } TransporterRegistry *reg = theFacade->get_registry(); for(unsigned int i=0;i<reg->m_transporter_interface.size();i++) { BaseString msg; DBUG_PRINT("info",("Setting dynamic port %d->%d : %d", reg->get_localNodeId(), reg->m_transporter_interface[i].m_remote_nodeId, reg->m_transporter_interface[i].m_s_service_port ) ); int res = setConnectionDbParameter((int)reg->get_localNodeId(), (int)reg->m_transporter_interface[i] .m_remote_nodeId, (int)CFG_CONNECTION_SERVER_PORT, reg->m_transporter_interface[i] .m_s_service_port, msg); DBUG_PRINT("info",("Set result: %d: %s",res,msg.c_str())); } _ownReference = numberToRef(_blockNumber, _ownNodeId); startEventLog(); // Set the initial confirmation count for subscribe requests confirm // from NDB nodes in the cluster. // // Loglevel thread _logLevelThread = NdbThread_Create(logLevelThread_C, (void**)this, 32768, "MgmtSrvr_Loglevel", NDB_THREAD_PRIO_LOW); DBUG_RETURN(true);}//****************************************************************************//****************************************************************************MgmtSrvr::~MgmtSrvr() { if(theFacade != 0){ theFacade->stop_instance(); delete theFacade; theFacade = 0; } stopEventLog(); NdbMutex_Destroy(m_node_id_mutex); NdbCondition_Destroy(theMgmtWaitForResponseCondPtr); NdbMutex_Destroy(m_configMutex); if(m_newConfig != NULL) free(m_newConfig); if(_config != NULL) delete _config; // End set log level thread void* res = 0; _isStopThread = true; if (_logLevelThread != NULL) { NdbThread_WaitFor(_logLevelThread, &res); NdbThread_Destroy(&_logLevelThread); } if (m_config_retriever) delete m_config_retriever;}//****************************************************************************//****************************************************************************int MgmtSrvr::okToSendTo(NodeId nodeId, bool unCond) { if(nodeId == 0 || getNodeType(nodeId) != NDB_MGM_NODE_TYPE_NDB) return WRONG_PROCESS_TYPE; // Check if we have contact with it if(unCond){ if(theFacade->theClusterMgr->getNodeInfo(nodeId).connected) return 0; } else if (theFacade->get_node_alive(nodeId) == true) return 0; return NO_CONTACT_WITH_PROCESS;}void report_unknown_signal(SimpleSignal *signal){ g_eventLogger.error("Unknown signal received. SignalNumber: " "%i from (%d, %x)", signal->readSignalNumber(), refToNode(signal->header.theSendersBlockRef), refToBlock(signal->header.theSendersBlockRef));}/***************************************************************************** * Starting and stopping database nodes ****************************************************************************/int MgmtSrvr::start(int nodeId){ INIT_SIGNAL_SENDER(ss,nodeId); SimpleSignal ssig; StartOrd* const startOrd = CAST_PTR(StartOrd, ssig.getDataPtrSend()); ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_START_ORD, StartOrd::SignalLength); startOrd->restartInfo = 0; return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;}/***************************************************************************** * Version handling *****************************************************************************/int MgmtSrvr::versionNode(int nodeId, Uint32 &version, const char **address){ version= 0; if (getOwnNodeId() == nodeId) { /** * If we're inquiring about our own node id, * We know what version we are (version implies connected for mgm) * but would like to find out from elsewhere what address they're using * to connect to us. This means that secondary mgm servers * can list ip addresses for mgm servers. * * If we don't get an address (i.e. no db nodes), * we get the address from the configuration. */ sendVersionReq(nodeId, version, address); version= NDB_VERSION; if(!*address) { ndb_mgm_configuration_iterator iter(*_config->m_configValues, CFG_SECTION_NODE); unsigned tmp= 0; for(iter.first();iter.valid();iter.next()) { if(iter.get(CFG_NODE_ID, &tmp)) require(false); if((unsigned)nodeId!=tmp) continue; if(iter.get(CFG_NODE_HOST, address)) require(false); break; } } } else if (getNodeType(nodeId) == NDB_MGM_NODE_TYPE_NDB) { ClusterMgr::Node node= theFacade->theClusterMgr->getNodeInfo(nodeId); if(node.connected) version= node.m_info.m_version; *address= get_connect_address(nodeId); } else if (getNodeType(nodeId) == NDB_MGM_NODE_TYPE_API || getNodeType(nodeId) == NDB_MGM_NODE_TYPE_MGM) { return sendVersionReq(nodeId, version, address); } return 0;}int MgmtSrvr::sendVersionReq(int v_nodeId, Uint32 &version, const char **address){ SignalSender ss(theFacade); ss.lock(); SimpleSignal ssig; ApiVersionReq* req = CAST_PTR(ApiVersionReq, ssig.getDataPtrSend()); req->senderRef = ss.getOwnRef(); req->nodeId = v_nodeId; ssig.set(ss, TestOrd::TraceAPI, QMGR, GSN_API_VERSION_REQ, ApiVersionReq::SignalLength); int do_send = 1; NodeId nodeId; while (1) { if (do_send) { bool next; nodeId = 0; while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true && okToSendTo(nodeId, true) != 0); const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(nodeId); if(next && node.m_state.startLevel != NodeState::SL_STARTED) { NodeId tmp=nodeId; while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true && okToSendTo(nodeId, true) != 0); if(!next) nodeId= tmp; } if(!next) return NO_CONTACT_WITH_DB_NODES; if (ss.sendSignal(nodeId, &ssig) != SEND_OK) { return SEND_OR_RECEIVE_FAILED; } do_send = 0; } SimpleSignal *signal = ss.waitFor(); int gsn = signal->readSignalNumber(); switch (gsn) { case GSN_API_VERSION_CONF: { const ApiVersionConf * const conf = CAST_CONSTPTR(ApiVersionConf, signal->getDataPtr()); assert(conf->nodeId == v_nodeId); version = conf->version; struct in_addr in; in.s_addr= conf->inet_addr; *address= inet_ntoa(in); return 0; } case GSN_NF_COMPLETEREP:{ const NFCompleteRep * const rep = CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr()); if (rep->failedNodeId == nodeId) do_send = 1; // retry with other node continue; } case GSN_NODE_FAILREP:{ const NodeFailRep * const rep = CAST_CONSTPTR(NodeFailRep, signal->getDataPtr()); if (NodeBitmask::get(rep->theNodes,nodeId)) do_send = 1; // retry with other node continue; } default: report_unknown_signal(signal); return SEND_OR_RECEIVE_FAILED; } break; } // while(1) return 0;}int MgmtSrvr::sendStopMgmd(NodeId nodeId, bool abort, bool stop, bool restart, bool nostart, bool initialStart){ const char* hostname; Uint32 port; BaseString connect_string; { Guard g(m_configMutex); { ndb_mgm_configuration_iterator iter(* _config->m_configValues, CFG_SECTION_NODE); if(iter.first()) return SEND_OR_RECEIVE_FAILED; if(iter.find(CFG_NODE_ID, nodeId)) return SEND_OR_RECEIVE_FAILED; if(iter.get(CFG_NODE_HOST, &hostname)) return SEND_OR_RECEIVE_FAILED; } { ndb_mgm_configuration_iterator iter(* _config->m_configValues, CFG_SECTION_NODE); if(iter.first()) return SEND_OR_RECEIVE_FAILED; if(iter.find(CFG_NODE_ID, nodeId)) return SEND_OR_RECEIVE_FAILED; if(iter.get(CFG_MGM_PORT, &port)) return SEND_OR_RECEIVE_FAILED; } if( strlen(hostname) == 0 ) return SEND_OR_RECEIVE_FAILED; } connect_string.assfmt("%s:%u",hostname,port); DBUG_PRINT("info",("connect string: %s",connect_string.c_str())); NdbMgmHandle h= ndb_mgm_create_handle(); if ( h && connect_string.length() > 0 ) { ndb_mgm_set_connectstring(h,connect_string.c_str()); if(ndb_mgm_connect(h,1,0,0)) { DBUG_PRINT("info",("failed ndb_mgm_connect")); return SEND_OR_RECEIVE_FAILED; } if(!restart) { if(ndb_mgm_stop(h, 1, (const int*)&nodeId) < 0) { return SEND_OR_RECEIVE_FAILED; } } else { int nodes[1]; nodes[0]= (int)nodeId; if(ndb_mgm_restart2(h, 1, nodes, initialStart, nostart, abort) < 0) { return SEND_OR_RECEIVE_FAILED; } } } ndb_mgm_destroy_handle(&h); return 0;}/* * Common method for handeling all STOP_REQ signalling that * is used by Stopping, Restarting and Single user commands * * In the event that we need to stop a mgmd, we create a mgm * client connection to that mgmd and stop it that way. * This allows us to stop mgm servers when there isn't any real * distributed communication up. */int MgmtSrvr::sendSTOP_REQ(const Vector<NodeId> &node_ids, NodeBitmask &stoppedNodes, Uint32 singleUserNodeId, bool abort, bool stop, bool restart, bool nostart, bool initialStart){ int error = 0; DBUG_ENTER("MgmtSrvr::sendSTOP_REQ"); DBUG_PRINT("enter", ("no of nodes: %d singleUseNodeId: %d " "abort: %d stop: %d restart: %d " "nostart: %d initialStart: %d", node_ids.size(), singleUserNodeId, abort, stop, restart, nostart, initialStart)); stoppedNodes.clear(); SignalSender ss(theFacade); ss.lock(); // lock will be released on exit SimpleSignal ssig; StopReq* const stopReq = CAST_PTR(StopReq, ssig.getDataPtrSend()); ssig.set(ss, TestOrd::TraceAPI, NDBCNTR, GSN_STOP_REQ, StopReq::SignalLength); stopReq->requestInfo = 0; stopReq->apiTimeout = 5000; stopReq->transactionTimeout = 1000; stopReq->readOperationTimeout = 1000; stopReq->operationTimeout = 1000; stopReq->senderData = 12;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -