📄 hod.py
字号:
self.__opCode = 1 raise Exception(e) elif self.__opCode == 12: self.__log.critical("Cluster %s already allocated." % clusterDir) elif self.__opCode == 10: self.__log.critical("dead\t%s\t%s" % (clusterInfo['jobid'], clusterDir)) elif self.__opCode == 13: self.__log.warn("hdfs dead\t%s\t%s" % (clusterInfo['jobid'], clusterDir)) elif self.__opCode == 14: self.__log.warn("mapred dead\t%s\t%s" % (clusterInfo['jobid'], clusterDir)) if self.__opCode > 0 and self.__opCode != 15: self.__log.critical("Cannot allocate cluster %s" % clusterDir) else: print self.__hodhelp.help(operation) self.__log.critical("%s operation requires two arguments. " % operation + "A cluster directory and a nodecount.") self.__opCode = 3 def _is_cluster_allocated(self, clusterDir): if os.path.isdir(clusterDir): self.__setup_cluster_state(clusterDir) clusterInfo = self.__clusterState.read() if clusterInfo != {}: return True return False def _op_deallocate(self, args): operation = "deallocate" argLength = len(args) if argLength == 2: clusterDir = self.__norm_cluster_dir(args[1]) if os.path.isdir(clusterDir): self.__setup_cluster_state(clusterDir) clusterInfo = self.__clusterState.read() if clusterInfo == {}: self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True) else: self.__opCode = \ self.__cluster.deallocate(clusterDir, clusterInfo) # irrespective of whether deallocate failed or not\ # remove the cluster state. self.__clusterState.clear() if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)): self.__log.critical(INVALID_STATE_FILE_MSGS[3] % \ self.__userState.get_state_file()) self.__opCode = 1 return self.__remove_cluster(clusterDir) else: self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True) else: print self.__hodhelp.help(operation) self.__log.critical("%s operation requires one argument. " % operation + "A cluster path.") self.__opCode = 3 def _op_list(self, args): operation = 'list' clusterList = self.__userState.read(CLUSTER_DATA_FILE) for path in clusterList.keys(): if not os.path.isdir(path): self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path)) continue self.__setup_cluster_state(path) clusterInfo = self.__clusterState.read() if clusterInfo == {}: # something wrong with the cluster directory. self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path)) continue clusterStatus = self.__cluster.check_cluster(clusterInfo) if clusterStatus == 12: self.__log.info("alive\t%s\t%s" % (clusterList[path], path)) elif clusterStatus == 10: self.__log.info("dead\t%s\t%s" % (clusterList[path], path)) elif clusterStatus == 13: self.__log.info("hdfs dead\t%s\t%s" % (clusterList[path], path)) elif clusterStatus == 14: self.__log.info("mapred dead\t%s\t%s" % (clusterList[path], path)) def _op_info(self, args): operation = 'info' argLength = len(args) if argLength == 2: clusterDir = self.__norm_cluster_dir(args[1]) if os.path.isdir(clusterDir): self.__setup_cluster_state(clusterDir) clusterInfo = self.__clusterState.read() if clusterInfo == {}: # something wrong with the cluster directory. self.__handle_invalid_cluster_directory(clusterDir) else: clusterStatus = self.__cluster.check_cluster(clusterInfo) if clusterStatus == 12: self.__print_cluster_info(clusterInfo) self.__log.info("hadoop-site.xml at %s" % clusterDir) elif clusterStatus == 10: self.__log.critical("%s cluster is dead" % clusterDir) elif clusterStatus == 13: self.__log.warn("%s cluster hdfs is dead" % clusterDir) elif clusterStatus == 14: self.__log.warn("%s cluster mapred is dead" % clusterDir) if clusterStatus != 12: if clusterStatus == 15: self.__log.critical("Cluster %s not allocated." % clusterDir) else: self.__print_cluster_info(clusterInfo) self.__log.info("hadoop-site.xml at %s" % clusterDir) self.__opCode = clusterStatus else: self.__handle_invalid_cluster_directory(clusterDir) else: print self.__hodhelp.help(operation) self.__log.critical("%s operation requires one argument. " % operation + "A cluster path.") self.__opCode = 3 def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False): if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)): self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \ self.__userState.get_state_file()) self.__opCode = 1 return clusterList = self.__userState.read(CLUSTER_DATA_FILE) if clusterDir in clusterList.keys(): # previously allocated cluster. self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir)) if cleanUp: self.__cluster.delete_job(clusterList[clusterDir]) self.__log.critical("Freeing resources allocated to the cluster.") if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)): self.__log.critical(INVALID_STATE_FILE_MSGS[1] % \ self.__userState.get_state_file()) self.__opCode = 1 return self.__remove_cluster(clusterDir) self.__opCode = 3 else: if not os.path.exists(clusterDir): self.__log.critical( \ "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \ clusterDir + " : No such directory") elif not os.path.isdir(clusterDir): self.__log.critical( \ "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \ clusterDir + " : Not a directory") else: self.__log.critical( \ "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \ clusterDir + " : Not tied to any allocated cluster.") self.__opCode = 15 def __print_cluster_info(self, clusterInfo): keys = clusterInfo.keys() _dict = { 'jobid' : 'Cluster Id', 'min' : 'Nodecount', 'hdfs' : 'HDFS UI at' , 'mapred' : 'Mapred UI at' } for key in _dict.keys(): if clusterInfo.has_key(key): self.__log.info("%s %s" % (_dict[key], clusterInfo[key])) if clusterInfo.has_key('ring'): self.__log.debug("%s\t%s" % ('Ringmaster at ', clusterInfo['ring'])) if self.__cfg['hod']['debug'] == 4: for var in clusterInfo['env'].keys(): self.__log.debug("%s = %s" % (var, clusterInfo['env'][var])) def _op_help(self, arg): if arg == None or arg.__len__() != 2: print "hod commands:\n" for op in self.__ops: print self.__hodhelp.help(op) else: if arg[1] not in self.__ops: print self.__hodhelp.help('help') self.__log.critical("Help requested for invalid operation : %s"%arg[1]) self.__opCode = 3 else: print self.__hodhelp.help(arg[1]) def operation(self): operation = self.__cfg['hod']['operation'] try: opList = self.__check_operation(operation) if self.__opCode == 0: if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)): self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \ self.__userState.get_state_file()) self.__opCode = 1 return self.__opCode getattr(self, "_op_%s" % opList[0])(opList) except HodInterruptException, h: self.__log.critical("op: %s failed because of a process interrupt." \ % operation) self.__opCode = HOD_INTERRUPTED_CODE except: self.__log.critical("op: %s failed: %s" % (operation, get_exception_error_string())) self.__log.debug(get_exception_string()) self.__cleanup() self.__log.debug("return code: %s" % self.__opCode) return self.__opCode def script(self): errorFlag = False errorMsgs = [] scriptRet = 0 # return from the script, if run script = self.__cfg['hod']['script'] nodes = self.__cfg['hod']['nodecount'] clusterDir = self.__cfg['hod']['clusterdir'] if not os.path.exists(script): errorFlag = True errorMsgs.append("Invalid script file (--hod.script or -s) : " + \ script + " : No such file") elif not os.path.isfile(script): errorFlag = True errorMsgs.append("Invalid script file (--hod.script or -s) : " + \ script + " : Not a file.") else: isExecutable = os.access(script, os.X_OK) if not isExecutable: errorFlag = True errorMsgs.append("Invalid script file (--hod.script or -s) : " + \ script + " : Not an executable.") if not os.path.exists(clusterDir): try: os.makedirs(clusterDir) except OSError, err: errorFlag = True errorMsgs.append("Could not create cluster directory. %s" % (str(err))) elif not os.path.isdir(clusterDir): errorFlag = True errorMsgs.append( \ "Invalid cluster directory (--hod.clusterdir or -d) : " + \ clusterDir + " : Not a directory") if int(self.__cfg['hod']['nodecount']) < 3 : errorFlag = True errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \ "Must be >= 3. Given nodes: %s" % nodes) if errorFlag: for msg in errorMsgs: self.__log.critical(msg) self.handle_script_exit_code(scriptRet, clusterDir) sys.exit(3) try: self._op_allocate(('allocate', clusterDir, str(nodes))) if self.__opCode == 0: if self.__cfg['hod'].has_key('script-wait-time'): time.sleep(self.__cfg['hod']['script-wait-time']) self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time']) if hodInterrupt.isSet(): self.__log.debug('Hod interrupted - not executing script') else: scriptRunner = hadoopScript(clusterDir, self.__cfg['hod']['original-dir']) self.__opCode = scriptRunner.run(script) scriptRet = self.__opCode self.__log.info("Exit code from running the script: %d" % self.__opCode) else: self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode) if hodInterrupt.isSet(): # Got interrupt while executing script. Unsetting it for deallocating hodInterrupt.setFlag(False) if self._is_cluster_allocated(clusterDir): self._op_deallocate(('deallocate', clusterDir)) except HodInterruptException, h: self.__log.critical("Script failed because of a process interrupt.") self.__opCode = HOD_INTERRUPTED_CODE except: self.__log.critical("script: %s failed: %s" % (script, get_exception_error_string())) self.__log.debug(get_exception_string()) self.__cleanup() self.handle_script_exit_code(scriptRet, clusterDir) return self.__opCode def handle_script_exit_code(self, scriptRet, clusterDir): # We want to give importance to a failed script's exit code, and write out exit code to a file separately # so users can easily get it if required. This way they can differentiate between the script's exit code # and hod's exit code. if os.path.exists(clusterDir): exit_code_file_name = (os.path.join(clusterDir, 'script.exitcode')) if scriptRet != 0: exit_code_file = open(exit_code_file_name, 'w') print >>exit_code_file, scriptRet exit_code_file.close() self.__opCode = scriptRet else: #ensure script exit code file is not there: if (os.path.exists(exit_code_file_name)): os.remove(exit_code_file_name)class hodHelp: def __init__(self): self.ops = ['allocate', 'deallocate', 'info', 'list','script', 'help'] self.usage_strings = \ { 'allocate' : 'hod allocate -d <clusterdir> -n <nodecount> [OPTIONS]', 'deallocate' : 'hod deallocate -d <clusterdir> [OPTIONS]', 'list' : 'hod list [OPTIONS]', 'info' : 'hod info -d <clusterdir> [OPTIONS]', 'script' : 'hod script -d <clusterdir> -n <nodecount> -s <script> [OPTIONS]', 'help' : 'hod help <OPERATION>', } self.description_strings = \ { 'allocate' : "Allocates a cluster of n nodes using the specified \n" + \ " cluster directory to store cluster state \n" + \ " information. The Hadoop site XML is also stored \n" + \ " in this location.\n", 'deallocate' : "Deallocates a cluster using the specified \n" + \ " cluster directory. This operation is also \n" + \ " required to clean up a dead cluster.\n", 'list' : "List all clusters currently allocated by a user, \n" + \ " along with limited status information and the \n" + \ " cluster ID.\n", 'info' : "Provide detailed information on an allocated cluster.\n", 'script' : "Allocates a cluster of n nodes with the given \n" +\ " cluster directory, runs the specified script \n" + \ " using the allocated cluster, and then \n" + \ " deallocates the cluster.\n", 'help' : "Print help for the operation and exit.\n" + \ "Available operations : %s.\n" % self.ops, } def usage(self, op): return "Usage : " + self.usage_strings[op] + "\n" + \ "For full description: hod help " + op + ".\n" def help(self, op=None): if op is None: return "hod <operation> [ARGS] [OPTIONS]\n" + \ "Available operations : %s\n" % self.ops + \ "For help on a particular operation : hod help <operation>.\n" + \ "For all options : hod help options." else: return "Usage : " + self.usage_strings[op] + "\n" + \ "Description : " + self.description_strings[op] + \ "For all options : hod help options.\n"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -