⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hod.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 2 页
字号:
              self.__opCode = 1              raise Exception(e)          elif self.__opCode == 12:            self.__log.critical("Cluster %s already allocated." % clusterDir)          elif self.__opCode == 10:            self.__log.critical("dead\t%s\t%s" % (clusterInfo['jobid'],                                                   clusterDir))          elif self.__opCode == 13:            self.__log.warn("hdfs dead\t%s\t%s" % (clusterInfo['jobid'],                                                        clusterDir))          elif self.__opCode == 14:            self.__log.warn("mapred dead\t%s\t%s" % (clusterInfo['jobid'],                                                      clusterDir))                       if self.__opCode > 0 and self.__opCode != 15:            self.__log.critical("Cannot allocate cluster %s" % clusterDir)    else:      print self.__hodhelp.help(operation)      self.__log.critical("%s operation requires two arguments. "  % operation                        + "A cluster directory and a nodecount.")      self.__opCode = 3   def _is_cluster_allocated(self, clusterDir):    if os.path.isdir(clusterDir):      self.__setup_cluster_state(clusterDir)      clusterInfo = self.__clusterState.read()      if clusterInfo != {}:        return True    return False  def _op_deallocate(self, args):    operation = "deallocate"    argLength = len(args)    if argLength == 2:      clusterDir = self.__norm_cluster_dir(args[1])      if os.path.isdir(clusterDir):        self.__setup_cluster_state(clusterDir)        clusterInfo = self.__clusterState.read()        if clusterInfo == {}:          self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)        else:          self.__opCode = \            self.__cluster.deallocate(clusterDir, clusterInfo)          # irrespective of whether deallocate failed or not\          # remove the cluster state.          self.__clusterState.clear()          if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):            self.__log.critical(INVALID_STATE_FILE_MSGS[3] % \                               self.__userState.get_state_file())            self.__opCode = 1            return          self.__remove_cluster(clusterDir)      else:        self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)    else:      print self.__hodhelp.help(operation)      self.__log.critical("%s operation requires one argument. "  % operation                        + "A cluster path.")      self.__opCode = 3              def _op_list(self, args):    operation = 'list'    clusterList = self.__userState.read(CLUSTER_DATA_FILE)    for path in clusterList.keys():      if not os.path.isdir(path):        self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))        continue      self.__setup_cluster_state(path)      clusterInfo = self.__clusterState.read()      if clusterInfo == {}:        # something wrong with the cluster directory.        self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))        continue      clusterStatus = self.__cluster.check_cluster(clusterInfo)      if clusterStatus == 12:        self.__log.info("alive\t%s\t%s" % (clusterList[path], path))      elif clusterStatus == 10:        self.__log.info("dead\t%s\t%s" % (clusterList[path], path))      elif clusterStatus == 13:        self.__log.info("hdfs dead\t%s\t%s" % (clusterList[path], path))      elif clusterStatus == 14:        self.__log.info("mapred dead\t%s\t%s" % (clusterList[path], path))               def _op_info(self, args):    operation = 'info'    argLength = len(args)      if argLength == 2:      clusterDir = self.__norm_cluster_dir(args[1])      if os.path.isdir(clusterDir):        self.__setup_cluster_state(clusterDir)        clusterInfo = self.__clusterState.read()        if clusterInfo == {}:          # something wrong with the cluster directory.          self.__handle_invalid_cluster_directory(clusterDir)        else:          clusterStatus = self.__cluster.check_cluster(clusterInfo)          if clusterStatus == 12:            self.__print_cluster_info(clusterInfo)            self.__log.info("hadoop-site.xml at %s" % clusterDir)          elif clusterStatus == 10:            self.__log.critical("%s cluster is dead" % clusterDir)          elif clusterStatus == 13:            self.__log.warn("%s cluster hdfs is dead" % clusterDir)          elif clusterStatus == 14:            self.__log.warn("%s cluster mapred is dead" % clusterDir)          if clusterStatus != 12:            if clusterStatus == 15:              self.__log.critical("Cluster %s not allocated." % clusterDir)            else:              self.__print_cluster_info(clusterInfo)              self.__log.info("hadoop-site.xml at %s" % clusterDir)                        self.__opCode = clusterStatus      else:        self.__handle_invalid_cluster_directory(clusterDir)    else:      print self.__hodhelp.help(operation)      self.__log.critical("%s operation requires one argument. "  % operation                        + "A cluster path.")      self.__opCode = 3        def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):    if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):      self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \                           self.__userState.get_state_file())      self.__opCode = 1      return    clusterList = self.__userState.read(CLUSTER_DATA_FILE)    if clusterDir in clusterList.keys():      # previously allocated cluster.      self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))      if cleanUp:        self.__cluster.delete_job(clusterList[clusterDir])        self.__log.critical("Freeing resources allocated to the cluster.")        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):          self.__log.critical(INVALID_STATE_FILE_MSGS[1] % \                              self.__userState.get_state_file())          self.__opCode = 1          return        self.__remove_cluster(clusterDir)      self.__opCode = 3    else:      if not os.path.exists(clusterDir):        self.__log.critical(  \                  "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \                  clusterDir + " : No such directory")      elif not os.path.isdir(clusterDir):        self.__log.critical( \                  "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \                  clusterDir + " : Not a directory")      else:        self.__log.critical( \                  "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \                  clusterDir + " : Not tied to any allocated cluster.")      self.__opCode = 15      def __print_cluster_info(self, clusterInfo):    keys = clusterInfo.keys()    _dict = {               'jobid' : 'Cluster Id', 'min' : 'Nodecount',              'hdfs' : 'HDFS UI at' , 'mapred' : 'Mapred UI at'            }    for key in _dict.keys():      if clusterInfo.has_key(key):        self.__log.info("%s %s" % (_dict[key], clusterInfo[key]))    if clusterInfo.has_key('ring'):      self.__log.debug("%s\t%s" % ('Ringmaster at ', clusterInfo['ring']))        if self.__cfg['hod']['debug'] == 4:      for var in clusterInfo['env'].keys():        self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))  def _op_help(self, arg):    if arg == None or arg.__len__() != 2:      print "hod commands:\n"      for op in self.__ops:        print self.__hodhelp.help(op)    else:      if arg[1] not in self.__ops:        print self.__hodhelp.help('help')        self.__log.critical("Help requested for invalid operation : %s"%arg[1])        self.__opCode = 3      else: print self.__hodhelp.help(arg[1])  def operation(self):      operation = self.__cfg['hod']['operation']    try:      opList = self.__check_operation(operation)      if self.__opCode == 0:        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):           self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \                         self.__userState.get_state_file())           self.__opCode = 1           return self.__opCode        getattr(self, "_op_%s" % opList[0])(opList)    except HodInterruptException, h:      self.__log.critical("op: %s failed because of a process interrupt." \                                                                % operation)      self.__opCode = HOD_INTERRUPTED_CODE    except:      self.__log.critical("op: %s failed: %s" % (operation,                          get_exception_error_string()))      self.__log.debug(get_exception_string())        self.__cleanup()        self.__log.debug("return code: %s" % self.__opCode)        return self.__opCode    def script(self):    errorFlag = False    errorMsgs = []    scriptRet = 0 # return from the script, if run        script = self.__cfg['hod']['script']    nodes = self.__cfg['hod']['nodecount']    clusterDir = self.__cfg['hod']['clusterdir']        if not os.path.exists(script):      errorFlag = True      errorMsgs.append("Invalid script file (--hod.script or -s) : " + \                       script + " : No such file")    elif not os.path.isfile(script):      errorFlag = True      errorMsgs.append("Invalid script file (--hod.script or -s) : " + \                       script + " : Not a file.")    else:      isExecutable = os.access(script, os.X_OK)      if not isExecutable:        errorFlag = True        errorMsgs.append("Invalid script file (--hod.script or -s) : " + \                         script + " : Not an executable.")    if not os.path.exists(clusterDir):      try:        os.makedirs(clusterDir)      except OSError, err:        errorFlag = True        errorMsgs.append("Could not create cluster directory. %s" % (str(err)))    elif not os.path.isdir(clusterDir):      errorFlag = True      errorMsgs.append( \                  "Invalid cluster directory (--hod.clusterdir or -d) : " + \                       clusterDir + " : Not a directory")    if int(self.__cfg['hod']['nodecount']) < 3 :      errorFlag = True      errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \                       "Must be >= 3. Given nodes: %s" % nodes)    if errorFlag:      for msg in errorMsgs:        self.__log.critical(msg)      self.handle_script_exit_code(scriptRet, clusterDir)      sys.exit(3)    try:      self._op_allocate(('allocate', clusterDir, str(nodes)))      if self.__opCode == 0:        if self.__cfg['hod'].has_key('script-wait-time'):          time.sleep(self.__cfg['hod']['script-wait-time'])          self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time'])        if hodInterrupt.isSet():          self.__log.debug('Hod interrupted - not executing script')        else:          scriptRunner = hadoopScript(clusterDir,                                   self.__cfg['hod']['original-dir'])          self.__opCode = scriptRunner.run(script)          scriptRet = self.__opCode          self.__log.info("Exit code from running the script: %d" % self.__opCode)      else:        self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode)      if hodInterrupt.isSet():        # Got interrupt while executing script. Unsetting it for deallocating        hodInterrupt.setFlag(False)      if self._is_cluster_allocated(clusterDir):        self._op_deallocate(('deallocate', clusterDir))    except HodInterruptException, h:      self.__log.critical("Script failed because of a process interrupt.")      self.__opCode = HOD_INTERRUPTED_CODE    except:      self.__log.critical("script: %s failed: %s" % (script,                          get_exception_error_string()))      self.__log.debug(get_exception_string())        self.__cleanup()    self.handle_script_exit_code(scriptRet, clusterDir)        return self.__opCode  def handle_script_exit_code(self, scriptRet, clusterDir):    # We want to give importance to a failed script's exit code, and write out exit code to a file separately    # so users can easily get it if required. This way they can differentiate between the script's exit code    # and hod's exit code.    if os.path.exists(clusterDir):      exit_code_file_name = (os.path.join(clusterDir, 'script.exitcode'))      if scriptRet != 0:        exit_code_file = open(exit_code_file_name, 'w')        print >>exit_code_file, scriptRet        exit_code_file.close()        self.__opCode = scriptRet      else:        #ensure script exit code file is not there:        if (os.path.exists(exit_code_file_name)):          os.remove(exit_code_file_name)class hodHelp:  def __init__(self):    self.ops = ['allocate', 'deallocate', 'info', 'list','script',  'help']    self.usage_strings = \      {        'allocate'   : 'hod allocate -d <clusterdir> -n <nodecount> [OPTIONS]',        'deallocate' : 'hod deallocate -d <clusterdir> [OPTIONS]',        'list'       : 'hod list [OPTIONS]',        'info'       : 'hod info -d <clusterdir> [OPTIONS]',        'script'     :              'hod script -d <clusterdir> -n <nodecount> -s <script> [OPTIONS]',        'help'       : 'hod help <OPERATION>',        }    self.description_strings = \      {       'allocate' : "Allocates a cluster of n nodes using the specified \n" + \      "              cluster directory to store cluster state \n" + \      "              information. The Hadoop site XML is also stored \n" + \      "              in this location.\n",       'deallocate' : "Deallocates a cluster using the specified \n" + \      "             cluster directory.  This operation is also \n" + \      "             required to clean up a dead cluster.\n",       'list' : "List all clusters currently allocated by a user, \n" + \      "              along with limited status information and the \n" + \      "              cluster ID.\n",       'info' : "Provide detailed information on an allocated cluster.\n",       'script' : "Allocates a cluster of n nodes with the given \n" +\           "              cluster directory, runs the specified script \n" + \           "              using the allocated cluster, and then \n" + \           "              deallocates the cluster.\n",        'help' : "Print help for the operation and exit.\n" + \                "Available operations : %s.\n" % self.ops,       }  def usage(self, op):    return "Usage       : " + self.usage_strings[op] + "\n" + \           "For full description: hod help " + op + ".\n"  def help(self, op=None):    if op is None:      return "hod <operation> [ARGS] [OPTIONS]\n" + \             "Available operations : %s\n" % self.ops + \             "For help on a particular operation : hod help <operation>.\n" + \             "For all options : hod help options."    else:      return "Usage       : " + self.usage_strings[op] + "\n" + \             "Description : " + self.description_strings[op] + \             "For all options : hod help options.\n"

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -