📄 commandinterpreter.cpp
字号:
//*****************************************************************************//*****************************************************************************static int do_event_thread;static void*event_thread_run(void* m){ DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, 0 }; int fd = ndb_mgm_listen_event(handle, filter); if (fd != NDB_INVALID_SOCKET) { do_event_thread= 1; char *tmp= 0; char buf[1024]; SocketInputStream in(fd,10); do { if (tmp == 0) NdbSleep_MilliSleep(10); if((tmp = in.gets(buf, 1024))) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) ndbout << tmp; } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } else { do_event_thread= -1; } DBUG_RETURN(NULL);}boolCommandInterpreter::connect() { DBUG_ENTER("CommandInterpreter::connect"); if(!m_connected) { if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) { const char *host= ndb_mgm_get_connected_host(m_mgmsrv); unsigned port= ndb_mgm_get_connected_port(m_mgmsrv); BaseString constr; constr.assfmt("%s:%d",host,port); if(!ndb_mgm_set_connectstring(m_mgmsrv2, constr.c_str()) && !ndb_mgm_connect(m_mgmsrv2, try_reconnect-1, 5, 1)) { DBUG_PRINT("info",("2:ndb connected to Management Server ok at: %s:%d", host, port)); assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; m_event_thread = NdbThread_Create(event_thread_run, (void**)&m_mgmsrv2, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); if (m_event_thread != 0) { DBUG_PRINT("info",("Thread created ok, waiting for started...")); int iter= 1000; // try for 30 seconds while(do_event_thread == 0 && iter-- > 0) NdbSleep_MilliSleep(30); } if (m_event_thread == 0 || do_event_thread == 0 || do_event_thread == -1) { DBUG_PRINT("info",("Warning, event thread startup failed, " "degraded printouts as result, errno=%d", errno)); printf("Warning, event thread startup failed, " "degraded printouts as result, errno=%d\n", errno); do_event_thread= 0; if (m_event_thread) { void *res; NdbThread_WaitFor(m_event_thread, &res); NdbThread_Destroy(&m_event_thread); } ndb_mgm_disconnect(m_mgmsrv2); } } else { DBUG_PRINT("warning", ("Could not do 2:nd connect to mgmtserver for event listening")); DBUG_PRINT("info", ("code: %d, msg: %s", ndb_mgm_get_latest_error(m_mgmsrv2), ndb_mgm_get_latest_error_msg(m_mgmsrv2))); printf("Warning, event connect failed, degraded printouts as result\n"); printf("code: %d, msg: %s\n", ndb_mgm_get_latest_error(m_mgmsrv2), ndb_mgm_get_latest_error_msg(m_mgmsrv2)); } m_connected= true; DBUG_PRINT("info",("Connected to Management Server at: %s:%d", host, port)); if (m_verbose) { printf("Connected to Management Server at: %s:%d\n", host, port); } } } DBUG_RETURN(m_connected);}bool CommandInterpreter::disconnect() { DBUG_ENTER("CommandInterpreter::disconnect"); if (m_event_thread) { void *res; do_event_thread= 0; NdbThread_WaitFor(m_event_thread, &res); NdbThread_Destroy(&m_event_thread); m_event_thread= 0; ndb_mgm_disconnect(m_mgmsrv2); } if (m_connected) { if (ndb_mgm_disconnect(m_mgmsrv) == -1) { ndbout_c("Could not disconnect from management server"); printError(); } m_connected= false; } DBUG_RETURN(true);}//*****************************************************************************//*****************************************************************************int CommandInterpreter::execute(const char *_line, int _try_reconnect, int *error) { if (_try_reconnect >= 0) try_reconnect=_try_reconnect; int result= execute_impl(_line); if (error) *error= m_error; return result;}static voidinvalid_command(const char *cmd){ ndbout << "Invalid command: " << cmd << endl; ndbout << "Type HELP for help." << endl << endl;}int CommandInterpreter::execute_impl(const char *_line) { DBUG_ENTER("CommandInterpreter::execute_impl"); DBUG_PRINT("enter",("line=\"%s\"",_line)); m_error= 0; char * line; if(_line == NULL) { DBUG_RETURN(false); } line = my_strdup(_line,MYF(MY_WME)); My_auto_ptr<char> ptr(line); int do_continue; do { do_continue= 0; BaseString::trim(line," \t"); if (line[0] == 0 || line[0] == '#') { DBUG_RETURN(true); } // for mysql client compatability remove trailing ';' { unsigned last= strlen(line)-1; if (line[last] == ';') { line[last]= 0; do_continue= 1; } } } while (do_continue); // if there is anything in the line proceed Vector<BaseString> command_list; { BaseString tmp(line); tmp.split(command_list); for (unsigned i= 0; i < command_list.size();) command_list[i].c_str()[0] ? i++ : (command_list.erase(i),0); } char* firstToken = strtok(line, " "); char* allAfterFirstToken = strtok(NULL, ""); if (strcasecmp(firstToken, "HELP") == 0 || strcasecmp(firstToken, "?") == 0) { executeHelp(allAfterFirstToken); DBUG_RETURN(true); } else if (strcasecmp(firstToken, "CONNECT") == 0) { executeConnect(allAfterFirstToken); DBUG_RETURN(true); } else if (strcasecmp(firstToken, "SLEEP") == 0) { if (allAfterFirstToken) sleep(atoi(allAfterFirstToken)); DBUG_RETURN(true); } else if((strcasecmp(firstToken, "QUIT") == 0 || strcasecmp(firstToken, "EXIT") == 0 || strcasecmp(firstToken, "BYE") == 0) && allAfterFirstToken == NULL){ DBUG_RETURN(false); } if (!connect()) DBUG_RETURN(true); if (strcasecmp(firstToken, "SHOW") == 0) { executeShow(allAfterFirstToken); DBUG_RETURN(true); } else if (strcasecmp(firstToken, "SHUTDOWN") == 0) { m_error= executeShutdown(allAfterFirstToken); DBUG_RETURN(true); } else if (strcasecmp(firstToken, "CLUSTERLOG") == 0){ executeClusterLog(allAfterFirstToken); DBUG_RETURN(true); } else if(strcasecmp(firstToken, "START") == 0 && allAfterFirstToken != NULL && strncasecmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){ m_error= executeStartBackup(allAfterFirstToken); DBUG_RETURN(true); } else if(strcasecmp(firstToken, "ABORT") == 0 && allAfterFirstToken != NULL && strncasecmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){ executeAbortBackup(allAfterFirstToken); DBUG_RETURN(true); } else if (strcasecmp(firstToken, "PURGE") == 0) { executePurge(allAfterFirstToken); DBUG_RETURN(true); } #ifdef HAVE_GLOBAL_REPLICATION else if(strcasecmp(firstToken, "REPLICATION") == 0 || strcasecmp(firstToken, "REP") == 0) { executeRep(allAfterFirstToken); DBUG_RETURN(true); }#endif // HAVE_GLOBAL_REPLICATION else if(strcasecmp(firstToken, "ENTER") == 0 && allAfterFirstToken != NULL && strncasecmp(allAfterFirstToken, "SINGLE USER MODE ", sizeof("SINGLE USER MODE") - 1) == 0){ executeEnterSingleUser(allAfterFirstToken); DBUG_RETURN(true); } else if(strcasecmp(firstToken, "EXIT") == 0 && allAfterFirstToken != NULL && strncasecmp(allAfterFirstToken, "SINGLE USER MODE ", sizeof("SINGLE USER MODE") - 1) == 0){ executeExitSingleUser(allAfterFirstToken); DBUG_RETURN(true); } else if (strcasecmp(firstToken, "ALL") == 0) { analyseAfterFirstToken(-1, allAfterFirstToken); } else { /** * First tokens should be digits, node ID's */ int node_ids[MAX_NODES]; unsigned pos; for (pos= 0; pos < command_list.size(); pos++) { int node_id; if (convert(command_list[pos].c_str(), node_id)) { if (node_id <= 0) { ndbout << "Invalid node ID: " << command_list[pos].c_str() << "." << endl; DBUG_RETURN(true); } node_ids[pos]= node_id; continue; } break; } int no_of_nodes= pos; if (no_of_nodes == 0) { /* No digit found */ invalid_command(_line); DBUG_RETURN(true); } if (pos == command_list.size()) { /* No command found */ invalid_command(_line); DBUG_RETURN(true); } if (no_of_nodes == 1) { analyseAfterFirstToken(node_ids[0], allAfterFirstToken); DBUG_RETURN(true); } executeCommand(command_list, pos, node_ids, no_of_nodes); DBUG_RETURN(true); } DBUG_RETURN(true);}/** * List of commands used as second command argument */static const CommandInterpreter::CommandFunctionPair commands[] = { { "START", &CommandInterpreter::executeStart } ,{ "RESTART", &CommandInterpreter::executeRestart } ,{ "STOP", &CommandInterpreter::executeStop } ,{ "STATUS", &CommandInterpreter::executeStatus } ,{ "LOGLEVEL", &CommandInterpreter::executeLogLevel } ,{ "CLUSTERLOG", &CommandInterpreter::executeEventReporting }#ifdef ERROR_INSERT ,{ "ERROR", &CommandInterpreter::executeError }#endif ,{ "LOG", &CommandInterpreter::executeLog } ,{ "LOGIN", &CommandInterpreter::executeLogIn } ,{ "LOGOUT", &CommandInterpreter::executeLogOut } ,{ "LOGOFF", &CommandInterpreter::executeLogOff } ,{ "TESTON", &CommandInterpreter::executeTestOn } ,{ "TESTOFF", &CommandInterpreter::executeTestOff } ,{ "SET", &CommandInterpreter::executeSet } ,{ "GETSTAT", &CommandInterpreter::executeGetStat } ,{ "DUMP", &CommandInterpreter::executeDumpState }};//*****************************************************************************//*****************************************************************************voidCommandInterpreter::analyseAfterFirstToken(int processId, char* allAfterFirstToken) { if (emptyString(allAfterFirstToken)) { ndbout << "Expected a command after " << ((processId == -1) ? "ALL." : "node ID.") << endl; return; } char* secondToken = strtok(allAfterFirstToken, " "); char* allAfterSecondToken = strtok(NULL, "\0"); const int tmpSize = sizeof(commands)/sizeof(CommandFunctionPair); ExecuteFunction fun = 0; const char * command = 0; for(int i = 0; i<tmpSize; i++){ if(strcasecmp(secondToken, commands[i].command) == 0){ fun = commands[i].executeFunction; command = commands[i].command; break; } } if(fun == 0){ invalid_command(secondToken); return; } if(processId == -1){ executeForAll(command, fun, allAfterSecondToken); } else { (this->*fun)(processId, allAfterSecondToken, false); } ndbout << endl;}voidCommandInterpreter::executeCommand(Vector<BaseString> &command_list, unsigned command_pos, int *node_ids, int no_of_nodes){ const char *cmd= command_list[command_pos].c_str(); if (strcasecmp("STOP", cmd) == 0) { executeStop(command_list, command_pos+1, node_ids, no_of_nodes); return; } if (strcasecmp("RESTART", cmd) == 0) { executeRestart(command_list, command_pos+1, node_ids, no_of_nodes); return; } ndbout_c("Invalid command: '%s' after multi node id list. " "Expected STOP or RESTART.", cmd); return;}/** * Get next nodeid larger than the give node_id. node_id will be * set to the next node_id in the list. node_id should be set * to 0 (zero) on the first call. * * @param handle the NDB management handle * @param node_id last node_id retreived, 0 at first call * @param type type of node to look for * @return 1 if a node was found, 0 if no more node exist */static int get_next_nodeid(struct ndb_mgm_cluster_state *cl, int *node_id, enum ndb_mgm_node_type type){ int i; if(cl == NULL) return 0; i=0; while((i < cl->no_of_nodes)) { if((*node_id < cl->node_states[i].node_id) && (cl->node_states[i].node_type == type)) { if(i >= cl->no_of_nodes) return 0; *node_id = cl->node_states[i].node_id; return 1; } i++; } return 0;}voidCommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, const char * allAfterSecondToken){ int nodeId = 0; if(strcasecmp(cmd, "STOP") == 0) { ndbout_c("Executing STOP on all nodes."); (this->*fun)(nodeId, allAfterSecondToken, true); } else if(strcasecmp(cmd, "RESTART") == 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -