📄 mgmtsrvr.cpp
字号:
void MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId, bool alive, bool nfComplete){ DBUG_ENTER("MgmtSrvr::nodeStatusNotification"); DBUG_PRINT("enter",("nodeid= %d, alive= %d, nfComplete= %d", nodeId, alive, nfComplete)); ((MgmtSrvr*)mgmSrv)->handleStatus(nodeId, alive, nfComplete); DBUG_VOID_RETURN;}enum ndb_mgm_node_type MgmtSrvr::getNodeType(NodeId nodeId) const { if(nodeId >= MAX_NODES) return (enum ndb_mgm_node_type)-1; return nodeTypes[nodeId];}const char *MgmtSrvr::get_connect_address(Uint32 node_id){ if (m_connect_address[node_id].s_addr == 0 && theFacade && theFacade->theTransporterRegistry && theFacade->theClusterMgr && getNodeType(node_id) == NDB_MGM_NODE_TYPE_NDB) { const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(node_id); if (node.connected) { m_connect_address[node_id]= theFacade->theTransporterRegistry->get_connect_address(node_id); } } return inet_ntoa(m_connect_address[node_id]); }voidMgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const{ if (theFacade && theFacade->theClusterMgr) { for(Uint32 i = 0; i < MAX_NODES; i++) { if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) { const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); connected_nodes.bitOR(node.m_state.m_connected_nodes); } } }}boolMgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, struct sockaddr *client_addr, SOCKET_SIZE_TYPE *client_addr_len, int &error_code, BaseString &error_string, int log_event){ DBUG_ENTER("MgmtSrvr::alloc_node_id"); DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d", *nodeId, type, client_addr)); if (g_no_nodeid_checks) { if (*nodeId == 0) { error_string.appfmt("no-nodeid-checks set in management server.\n" "node id must be set explicitly in connectstring"); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; DBUG_RETURN(false); } DBUG_RETURN(true); } Guard g(m_node_id_mutex); int no_mgm= 0; NodeBitmask connected_nodes(m_reserved_nodes); get_connected_nodes(connected_nodes); { for(Uint32 i = 0; i < MAX_NODES; i++) if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM) no_mgm++; } bool found_matching_id= false; bool found_matching_type= false; bool found_free_node= false; unsigned id_found= 0; const char *config_hostname= 0; struct in_addr config_addr= {0}; int r_config_addr= -1; unsigned type_c= 0; if(NdbMutex_Lock(m_configMutex)) { // should not happen error_string.appfmt("unable to lock configuration mutex"); error_code = NDB_MGM_ALLOCID_ERROR; DBUG_RETURN(false); } ndb_mgm_configuration_iterator iter(* _config->m_configValues, CFG_SECTION_NODE); for(iter.first(); iter.valid(); iter.next()) { unsigned tmp= 0; if(iter.get(CFG_NODE_ID, &tmp)) require(false); if (*nodeId && *nodeId != tmp) continue; found_matching_id= true; if(iter.get(CFG_TYPE_OF_SECTION, &type_c)) require(false); if(type_c != (unsigned)type) continue; found_matching_type= true; if (connected_nodes.get(tmp)) continue; found_free_node= true; if(iter.get(CFG_NODE_HOST, &config_hostname)) require(false); if (config_hostname && config_hostname[0] == 0) config_hostname= 0; else if (client_addr) { // check hostname compatability const void *tmp_in= &(((sockaddr_in*)client_addr)->sin_addr); if((r_config_addr= Ndb_getInAddr(&config_addr, config_hostname)) != 0 || memcmp(&config_addr, tmp_in, sizeof(config_addr)) != 0) { struct in_addr tmp_addr; if(Ndb_getInAddr(&tmp_addr, "localhost") != 0 || memcmp(&tmp_addr, tmp_in, sizeof(config_addr)) != 0) { // not localhost#if 0 ndbout << "MgmtSrvr::getFreeNodeId compare failed for \"" << config_hostname << "\" id=" << tmp << endl;#endif continue; } // connecting through localhost // check if config_hostname is local if (!SocketServer::tryBind(0,config_hostname)) { continue; } } } else { // client_addr == 0 if (!SocketServer::tryBind(0,config_hostname)) { continue; } } if (*nodeId != 0 || type != NDB_MGM_NODE_TYPE_MGM || no_mgm == 1) { // any match is ok if (config_hostname == 0 && *nodeId == 0 && type != NDB_MGM_NODE_TYPE_MGM) { if (!id_found) // only set if not set earlier id_found= tmp; continue; /* continue looking for a nodeid with specified * hostname */ } assert(id_found == 0); id_found= tmp; break; } if (id_found) { // mgmt server may only have one match error_string.appfmt("Ambiguous node id's %d and %d.\n" "Suggest specifying node id in connectstring,\n" "or specifying unique host names in config file.", id_found, tmp); NdbMutex_Unlock(m_configMutex); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; DBUG_RETURN(false); } if (config_hostname == 0) { error_string.appfmt("Ambiguity for node id %d.\n" "Suggest specifying node id in connectstring,\n" "or specifying unique host names in config file,\n" "or specifying just one mgmt server in config file.", tmp); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; DBUG_RETURN(false); } id_found= tmp; // mgmt server matched, check for more matches } NdbMutex_Unlock(m_configMutex); if (id_found) { *nodeId= id_found; DBUG_PRINT("info", ("allocating node id %d",*nodeId)); { int r= 0; if (client_addr) m_connect_address[id_found]= ((struct sockaddr_in *)client_addr)->sin_addr; else if (config_hostname) r= Ndb_getInAddr(&(m_connect_address[id_found]), config_hostname); else { char name[256]; r= gethostname(name, sizeof(name)); if (r == 0) { name[sizeof(name)-1]= 0; r= Ndb_getInAddr(&(m_connect_address[id_found]), name); } } if (r) m_connect_address[id_found].s_addr= 0; } m_reserved_nodes.set(id_found); if (theFacade && id_found != theFacade->ownId()) { /** * Make sure we're ready to accept connections from this node */ theFacade->lock_mutex(); theFacade->doConnect(id_found); theFacade->unlock_mutex(); } char tmp_str[128]; m_reserved_nodes.getText(tmp_str); g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, " "m_reserved_nodes %s.", id_found, get_connect_address(id_found), tmp_str); DBUG_RETURN(true); } if (found_matching_type && !found_free_node) { // we have a temporary error which might be due to that // we have got the latest connect status from db-nodes. Force update. global_flag_send_heartbeat_now= 1; } BaseString type_string, type_c_string; { const char *alias, *str; alias= ndb_mgm_get_node_type_alias_string(type, &str); type_string.assfmt("%s(%s)", alias, str); alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)type_c, &str); type_c_string.assfmt("%s(%s)", alias, str); } if (*nodeId == 0) { if (found_matching_id) { if (found_matching_type) { if (found_free_node) { error_string.appfmt("Connection done from wrong host ip %s.", (client_addr)? inet_ntoa(((struct sockaddr_in *) (client_addr))->sin_addr):""); error_code = NDB_MGM_ALLOCID_ERROR; } else { error_string.appfmt("No free node id found for %s.", type_string.c_str()); error_code = NDB_MGM_ALLOCID_ERROR; } } else { error_string.appfmt("No %s node defined in config file.", type_string.c_str()); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; } } else { error_string.append("No nodes defined in config file."); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; } } else { if (found_matching_id) { if (found_matching_type) { if (found_free_node) { // have to split these into two since inet_ntoa overwrites itself error_string.appfmt("Connection with id %d done from wrong host ip %s,", *nodeId, inet_ntoa(((struct sockaddr_in *) (client_addr))->sin_addr)); error_string.appfmt(" expected %s(%s).", config_hostname, r_config_addr ? "lookup failed" : inet_ntoa(config_addr)); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; } else { error_string.appfmt("Id %d already allocated by another node.", *nodeId); error_code = NDB_MGM_ALLOCID_ERROR; } } else { error_string.appfmt("Id %d configured as %s, connect attempted as %s.", *nodeId, type_c_string.c_str(), type_string.c_str()); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; } } else { error_string.appfmt("No node defined with id=%d in config file.", *nodeId); error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH; } } if (log_event || error_code == NDB_MGM_ALLOCID_CONFIG_MISMATCH) { g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s." " Returned error string \"%s\"", *nodeId, client_addr != 0 ? inet_ntoa(((struct sockaddr_in *) (client_addr))->sin_addr) : "<none>", error_string.c_str()); NodeBitmask connected_nodes2; get_connected_nodes(connected_nodes2); BaseString tmp_connected, tmp_not_connected; for(Uint32 i = 0; i < MAX_NODES; i++) { if (connected_nodes2.get(i)) { if (!m_reserved_nodes.get(i)) tmp_connected.appfmt(" %d", i); } else if (m_reserved_nodes.get(i)) { tmp_not_connected.appfmt(" %d", i); } } if (tmp_connected.length() > 0) g_eventLogger.info("Mgmt server state: node id's %s connected but not reserved", tmp_connected.c_str()); if (tmp_not_connected.length() > 0) g_eventLogger.info("Mgmt server state: node id's %s not connected but reserved", tmp_not_connected.c_str()); } DBUG_RETURN(false);}boolMgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const { NodeId tmp = * nodeId; tmp++; while(nodeTypes[tmp] != type && tmp < MAX_NODES) tmp++; if(tmp == MAX_NODES){ return false; } * nodeId = tmp; return true;}#include "Services.hpp"voidMgmtSrvr::eventReport(const Uint32 * theData){ const EventReport * const eventReport = (EventReport *)&theData[0]; NodeId nodeId = eventReport->getNodeId(); Ndb_logevent_type type = eventReport->getEventType(); // Log event g_eventLogger.log(type, theData, nodeId, &m_event_listner[0].m_logLevel); m_event_listner.log(type, theData, nodeId);}/*************************************************************************** * Backup ***************************************************************************/intMgmtSrvr::startBackup(Uint32& backupId, int waitCompleted){ SignalSender ss(theFacade); ss.lock(); // lock will be released on exit NodeId nodeId = m_master_node; if (okToSendTo(nodeId, false) != 0) { bool next; nodeId = m_master_node = 0; while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true && okToSendTo(nodeId, false) != 0); if(!next) return NO_CONTACT_WITH_DB_NODES; } SimpleSignal ssig; BackupReq* req = CAST_PTR(BackupReq, ssig.getDataPtrSend()); ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ, BackupReq::SignalLength); req->senderData = 19; req->backupDataLen = 0; assert(waitCompleted < 3); req->flags = waitCompleted & 0x3; BackupEvent event; int do_send = 1; while (1) { if (do_send) { if (ss.sendSignal(nodeId, &ssig) != SEND_OK) { return SEND_OR_RECEIVE_FAILED; } if (waitCompleted == 0) return 0; do_send = 0; } SimpleSignal *signal = ss.waitFor(); int gsn = signal->readSignalNumber(); switch (gsn) { case GSN_BACKUP_CONF:{ const BackupConf * const conf = CAST_CONSTPTR(BackupConf, signal->getDataPtr()); event.Event = BackupEvent::BackupStarted; event.Started.BackupId = conf->backupId; event.Nodes = conf->nodes;#ifdef VM_TRACE ndbout_c("Backup(%d) master is %d", conf->backupId, refToNode(signal->header.theSendersBlockRef));#endif backupId = conf->backupId; if (waitCompleted == 1) return 0; // wait for next signal break; } case GSN_BACKUP_COMPLETE_REP:{ const BackupCompleteRep * const rep = CAST_CONSTPTR(BackupCompleteRep, signal->getDataPtr());#ifdef VM_TRACE ndbout_c("Backup(%d) completed %d", rep->backupId);#endif event.Event = BackupEvent::BackupCompleted; event.Completed.BackupId = rep->backupId; event.Completed.NoOfBytes = rep->noOfBytesLow; event.Completed.NoOfLogBytes = rep->noOfLogBytes; event.Completed.NoOfRecords = rep->noOfRecordsLow; event.Completed.NoOfLogRecords = rep->noOfLogRecords; event.Completed.stopGCP = rep->stopGCP; event.Completed.startGCP = rep->startGCP; event.Nodes = rep->nodes; if (signal->header.theLength >= BackupCompleteRep::SignalLength) { event.Completed.NoOfBytes += ((Uint64)rep->noOfBytesHigh) << 32; event.Completed.NoOfRecords += ((Uint64)rep->noOfRecordsHigh) << 32; } backupId = rep->backupId; return 0; } case GSN_BACKUP_REF:{ const BackupRef * const ref = CAST_CONSTPTR(BackupRef, signal->getDataPtr()); if(ref->errorCode == BackupRef::IAmNotMaster){ m_master_node = nodeId = refToNode(ref->masterRef);#ifdef VM_TRACE ndbout_c("I'm not master resending to %d", nodeId);#endif do_
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -