📄 mgmtsrvr.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_pthread.h>#include "MgmtSrvr.hpp"#include "MgmtErrorReporter.hpp"#include <ConfigRetriever.hpp>#include <NdbOut.hpp>#include <NdbApiSignal.hpp>#include <kernel_types.h>#include <RefConvert.hpp>#include <BlockNumbers.h>#include <GlobalSignalNumbers.h>#include <signaldata/TestOrd.hpp>#include <signaldata/TamperOrd.hpp>#include <signaldata/StartOrd.hpp>#include <signaldata/ApiVersion.hpp>#include <signaldata/ResumeReq.hpp>#include <signaldata/SetLogLevelOrd.hpp>#include <signaldata/EventSubscribeReq.hpp>#include <signaldata/EventReport.hpp>#include <signaldata/DumpStateOrd.hpp>#include <signaldata/BackupSignalData.hpp>#include <signaldata/GrepImpl.hpp>#include <signaldata/ManagementServer.hpp>#include <signaldata/NFCompleteRep.hpp>#include <signaldata/NodeFailRep.hpp>#include <NdbSleep.h>#include <EventLogger.hpp>#include <DebuggerNames.hpp>#include <ndb_version.h>#include <SocketServer.hpp>#include <NdbConfig.h>#include <NdbAutoPtr.hpp>#include <ndberror.h>#include <mgmapi.h>#include <mgmapi_configuration.hpp>#include <mgmapi_config_parameters.h>#include <m_string.h>#include <SignalSender.hpp>extern bool g_StopServer;extern bool g_RestartServer;//#define MGM_SRV_DEBUG#ifdef MGM_SRV_DEBUG#define DEBUG(x) do ndbout << x << endl; while(0)#else#define DEBUG(x)#endif#define INIT_SIGNAL_SENDER(ss,nodeId) \ SignalSender ss(theFacade); \ ss.lock(); /* lock will be released on exit */ \ {\ int result = okToSendTo(nodeId, true);\ if (result != 0) {\ return result;\ }\ }extern int global_flag_send_heartbeat_now;extern int g_no_nodeid_checks;extern my_bool opt_core;static void require(bool v){ if(!v) { if (opt_core) abort(); else exit(-1); }}void *MgmtSrvr::logLevelThread_C(void* m){ MgmtSrvr *mgm = (MgmtSrvr*)m; mgm->logLevelThreadRun(); return 0;}extern EventLogger g_eventLogger;static NdbOut&operator<<(NdbOut& out, const LogLevel & ll){ out << "[LogLevel: "; for(size_t i = 0; i<LogLevel::LOGLEVEL_CATEGORIES; i++) out << ll.getLogLevel((LogLevel::EventCategory)i) << " "; out << "]"; return out;}voidMgmtSrvr::logLevelThreadRun() { while (!_isStopThread) { /** * Handle started nodes */ EventSubscribeReq req; req = m_event_listner[0].m_logLevel; req.blockRef = _ownReference; SetLogLevelOrd ord; m_started_nodes.lock(); while(m_started_nodes.size() > 0){ Uint32 node = m_started_nodes[0]; m_started_nodes.erase(0, false); m_started_nodes.unlock(); setEventReportingLevelImpl(node, req); ord = m_nodeLogLevel[node]; setNodeLogLevelImpl(node, ord); m_started_nodes.lock(); } m_started_nodes.unlock(); m_log_level_requests.lock(); while(m_log_level_requests.size() > 0){ req = m_log_level_requests[0]; m_log_level_requests.erase(0, false); m_log_level_requests.unlock(); LogLevel tmp; tmp = req; if(req.blockRef == 0){ req.blockRef = _ownReference; setEventReportingLevelImpl(0, req); } else { ord = req; setNodeLogLevelImpl(req.blockRef, ord); } m_log_level_requests.lock(); } m_log_level_requests.unlock(); NdbSleep_MilliSleep(_logLevelThreadSleep); }}voidMgmtSrvr::startEventLog() { NdbMutex_Lock(m_configMutex); g_eventLogger.setCategory("MgmSrvr"); ndb_mgm_configuration_iterator iter(* _config->m_configValues, CFG_SECTION_NODE); if(iter.find(CFG_NODE_ID, _ownNodeId) != 0){ NdbMutex_Unlock(m_configMutex); return; } const char * tmp; char errStr[100]; int err= 0; BaseString logdest; char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId); NdbAutoPtr<char> tmp_aptr(clusterLog); if(iter.get(CFG_LOG_DESTINATION, &tmp) == 0){ logdest.assign(tmp); } NdbMutex_Unlock(m_configMutex); if(logdest.length() == 0 || logdest == "") { logdest.assfmt("FILE:filename=%s,maxsize=1000000,maxfiles=6", clusterLog); } errStr[0]='\0'; if(!g_eventLogger.addHandler(logdest, &err, sizeof(errStr), errStr)) { ndbout << "Warning: could not add log destination \"" << logdest.c_str() << "\". Reason: "; if(err) ndbout << strerror(err); if(err && errStr[0]!='\0') ndbout << ", "; if(errStr[0]!='\0') ndbout << errStr; ndbout << endl; }}void MgmtSrvr::stopEventLog() { // Nothing yet}class ErrorItem {public: int _errorCode; const char * _errorText;};boolMgmtSrvr::setEventLogFilter(int severity, int enable){ Logger::LoggerLevel level = (Logger::LoggerLevel)severity; if (enable > 0) { g_eventLogger.enable(level); } else if (enable == 0) { g_eventLogger.disable(level); } else if (g_eventLogger.isEnable(level)) { g_eventLogger.disable(level); } else { g_eventLogger.enable(level); } return g_eventLogger.isEnable(level);}bool MgmtSrvr::isEventLogFilterEnabled(int severity) { return g_eventLogger.isEnable((Logger::LoggerLevel)severity);}static ErrorItem errorTable[] = { {MgmtSrvr::NO_CONTACT_WITH_PROCESS, "No contact with the process (dead ?)."}, {MgmtSrvr::PROCESS_NOT_CONFIGURED, "The process is not configured."}, {MgmtSrvr::WRONG_PROCESS_TYPE, "The process has wrong type. Expected a DB process."}, {MgmtSrvr::COULD_NOT_ALLOCATE_MEMORY, "Could not allocate memory."}, {MgmtSrvr::SEND_OR_RECEIVE_FAILED, "Send to process or receive failed."}, {MgmtSrvr::INVALID_LEVEL, "Invalid level. Should be between 1 and 30."}, {MgmtSrvr::INVALID_ERROR_NUMBER, "Invalid error number. Should be >= 0."}, {MgmtSrvr::INVALID_TRACE_NUMBER, "Invalid trace number."}, {MgmtSrvr::NOT_IMPLEMENTED, "Not implemented."}, {MgmtSrvr::INVALID_BLOCK_NAME, "Invalid block name"}, {MgmtSrvr::CONFIG_PARAM_NOT_EXIST, "The configuration parameter does not exist for the process type."}, {MgmtSrvr::CONFIG_PARAM_NOT_UPDATEABLE, "The configuration parameter is not possible to update."}, {MgmtSrvr::VALUE_WRONG_FORMAT_INT_EXPECTED, "Incorrect value. Expected integer."}, {MgmtSrvr::VALUE_TOO_LOW, "Value is too low."}, {MgmtSrvr::VALUE_TOO_HIGH, "Value is too high."}, {MgmtSrvr::VALUE_WRONG_FORMAT_BOOL_EXPECTED, "Incorrect value. Expected TRUE or FALSE."}, {MgmtSrvr::CONFIG_FILE_OPEN_WRITE_ERROR, "Could not open configuration file for writing."}, {MgmtSrvr::CONFIG_FILE_OPEN_READ_ERROR, "Could not open configuration file for reading."}, {MgmtSrvr::CONFIG_FILE_WRITE_ERROR, "Write error when writing configuration file."}, {MgmtSrvr::CONFIG_FILE_READ_ERROR, "Read error when reading configuration file."}, {MgmtSrvr::CONFIG_FILE_CLOSE_ERROR, "Could not close configuration file."}, {MgmtSrvr::CONFIG_CHANGE_REFUSED_BY_RECEIVER, "The change was refused by the receiving process."}, {MgmtSrvr::COULD_NOT_SYNC_CONFIG_CHANGE_AGAINST_PHYSICAL_MEDIUM, "The change could not be synced against physical medium."}, {MgmtSrvr::CONFIG_FILE_CHECKSUM_ERROR, "The config file is corrupt. Checksum error."}, {MgmtSrvr::NOT_POSSIBLE_TO_SEND_CONFIG_UPDATE_TO_PROCESS_TYPE, "It is not possible to send an update of a configuration variable " "to this kind of process."}, {MgmtSrvr::NODE_SHUTDOWN_IN_PROGESS, "Node shutdown in progress" }, {MgmtSrvr::SYSTEM_SHUTDOWN_IN_PROGRESS, "System shutdown in progress" }, {MgmtSrvr::NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH, "Node shutdown would cause system crash" }, {MgmtSrvr::UNSUPPORTED_NODE_SHUTDOWN, "Unsupported multi node shutdown. Abort option required." }, {MgmtSrvr::NODE_NOT_API_NODE, "The specified node is not an API node." }, {MgmtSrvr::OPERATION_NOT_ALLOWED_START_STOP, "Operation not allowed while nodes are starting or stopping."}, {MgmtSrvr::NO_CONTACT_WITH_DB_NODES, "No contact with database nodes" }};int MgmtSrvr::translateStopRef(Uint32 errCode){ switch(errCode){ case StopRef::NodeShutdownInProgress: return NODE_SHUTDOWN_IN_PROGESS; break; case StopRef::SystemShutdownInProgress: return SYSTEM_SHUTDOWN_IN_PROGRESS; break; case StopRef::NodeShutdownWouldCauseSystemCrash: return NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH; break; case StopRef::UnsupportedNodeShutdown: return UNSUPPORTED_NODE_SHUTDOWN; break; } return 4999;}static int noOfErrorCodes = sizeof(errorTable) / sizeof(ErrorItem);int MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const { int count = 0; NodeId nodeId = 0; while (getNextNodeId(&nodeId, type)) { count++; } return count;}int MgmtSrvr::getPort() const{ if(NdbMutex_Lock(m_configMutex)) return 0; ndb_mgm_configuration_iterator iter(* _config->m_configValues, CFG_SECTION_NODE); if(iter.find(CFG_NODE_ID, getOwnNodeId()) != 0){ ndbout << "Could not retrieve configuration for Node " << getOwnNodeId() << " in config file." << endl << "Have you set correct NodeId for this node?" << endl; NdbMutex_Unlock(m_configMutex); return 0; } unsigned type; if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0 || type != NODE_TYPE_MGM){ ndbout << "Local node id " << getOwnNodeId() << " is not defined as management server" << endl << "Have you set correct NodeId for this node?" << endl; NdbMutex_Unlock(m_configMutex); return 0; } Uint32 port = 0; if(iter.get(CFG_MGM_PORT, &port) != 0){ ndbout << "Could not find PortNumber in the configuration file." << endl; NdbMutex_Unlock(m_configMutex); return 0; } NdbMutex_Unlock(m_configMutex); return port;}/* Constructor */int MgmtSrvr::init(){ if ( _ownNodeId > 0) return 0; return -1;}MgmtSrvr::MgmtSrvr(SocketServer *socket_server, const char *config_filename, const char *connect_string) : _blockNumber(1), // Hard coded block number since it makes it easy to send // signals to other management servers. m_socket_server(socket_server), _ownReference(0), theSignalIdleList(NULL), theWaitState(WAIT_SUBSCRIBE_CONF), m_local_mgm_handle(0), m_event_listner(this), m_master_node(0){ DBUG_ENTER("MgmtSrvr::MgmtSrvr"); _ownNodeId= 0; _config = NULL; _isStopThread = false; _logLevelThread = NULL; _logLevelThreadSleep = 500; theFacade = 0; m_newConfig = NULL; if (config_filename) m_configFilename.assign(config_filename); m_nextConfigGenerationNumber = 0; m_config_retriever= new ConfigRetriever(connect_string, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); // if connect_string explicitly given or // no config filename is given then // first try to allocate nodeid from another management server if ((connect_string || config_filename == NULL) && (m_config_retriever->do_connect(0,0,0) == 0)) { int tmp_nodeid= 0; tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/); if (tmp_nodeid == 0) { ndbout_c(m_config_retriever->getErrorString()); require(false); } // read config from other managent server _config= fetchConfig(); if (_config == 0) { ndbout << m_config_retriever->getErrorString() << endl; require(false); } _ownNodeId= tmp_nodeid; } if (_ownNodeId == 0) { // read config locally _config= readConfig(); if (_config == 0) { ndbout << "Unable to read config file" << endl; exit(-1); } } theMgmtWaitForResponseCondPtr = NdbCondition_Create(); m_configMutex = NdbMutex_Create(); /** * Fill the nodeTypes array */ for(Uint32 i = 0; i<MAX_NODES; i++) { nodeTypes[i] = (enum ndb_mgm_node_type)-1; m_connect_address[i].s_addr= 0; } { ndb_mgm_configuration_iterator iter(* _config->m_configValues, CFG_SECTION_NODE); for(iter.first(); iter.valid(); iter.next()){ unsigned type, id; if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0) continue; if(iter.get(CFG_NODE_ID, &id) != 0) continue; MGM_REQUIRE(id < MAX_NODES); switch(type){ case NODE_TYPE_DB: nodeTypes[id] = NDB_MGM_NODE_TYPE_NDB; break; case NODE_TYPE_API: nodeTypes[id] = NDB_MGM_NODE_TYPE_API; break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -