⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hod.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 2 页
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.# -*- python -*-import sys, os, getpass, pprint, re, cPickle, random, shutil, time, errnoimport hodlib.Common.loggerfrom hodlib.ServiceRegistry.serviceRegistry import svcrgyfrom hodlib.Common.xmlrpc import hodXRClientfrom hodlib.Common.util import to_http_url, get_exception_stringfrom hodlib.Common.util import get_exception_error_stringfrom hodlib.Common.util import hodInterrupt, HodInterruptExceptionfrom hodlib.Common.util import HOD_INTERRUPTED_CODEfrom hodlib.Common.nodepoolutil import NodePoolUtilfrom hodlib.Hod.hadoop import hadoopCluster, hadoopScriptCLUSTER_DATA_FILE = 'clusters'INVALID_STATE_FILE_MSGS = \              [                "Requested operation cannot be performed. Cannot read %s: " + \                "Permission denied.",                "Requested operation cannot be performed. " + \                "Cannot write to %s: Permission denied.",                "Requested operation cannot be performed. " + \                "Cannot read/write to %s: Permission denied.",                "Cannot update %s: Permission denied. " + \                "Cluster is deallocated, but info and list " + \                "operations might show incorrect information.",              ]class hodState:  def __init__(self, store):    self.__store = store    self.__stateFile = None    self.__init_store()    self.__STORE_EXT = ".state"     def __init_store(self):    if not os.path.exists(self.__store):      os.mkdir(self.__store)    def __set_state_file(self, id=None):    if id:      self.__stateFile = os.path.join(self.__store, "%s%s" % (id,                                       self.__STORE_EXT))    else:      for item in os.listdir(self.__store):        if item.endswith(self.__STORE_EXT):            self.__stateFile = os.path.join(self.__store, item)            def get_state_file(self):    return self.__stateFile            def checkStateFile(self, id=None, modes=(os.R_OK,)):    # is state file exists/readable/writable/both?    self.__set_state_file(id)    # return true if file doesn't exist, because HOD CAN create    # state file and so WILL have permissions to read and/or write    try:      os.stat(self.__stateFile)    except OSError, err:      if err.errno == errno.ENOENT: # error 2 (no such file)        return True    # file exists    ret = True    for mode in modes:      ret = ret and os.access(self.__stateFile, mode)    return ret  def read(self, id=None):    info = {}        self.__set_state_file(id)      if self.__stateFile:      if os.path.isfile(self.__stateFile):        stateFile = open(self.__stateFile, 'r')        try:          info = cPickle.load(stateFile)        except EOFError:          pass                stateFile.close()        return info             def write(self, id, info):    self.__set_state_file(id)    if not os.path.exists(self.__stateFile):      self.clear(id)     stateFile = open(self.__stateFile, 'w')    cPickle.dump(info, stateFile)    stateFile.close()    def clear(self, id=None):    self.__set_state_file(id)    if self.__stateFile and os.path.exists(self.__stateFile):      os.remove(self.__stateFile)    else:      for item in os.listdir(self.__store):        if item.endswith(self.__STORE_EXT):          os.remove(item)        class hodRunner:  def __init__(self, cfg, log=None, cluster=None):    self.__hodhelp = hodHelp()    self.__ops = self.__hodhelp.ops    self.__cfg = cfg      self.__npd = self.__cfg['nodepooldesc']    self.__opCode = 0    self.__user = getpass.getuser()    self.__registry = None    self.__baseLogger = None    # Allowing to pass in log object to help testing - a stub can be passed in    if log is None:      self.__setup_logger()    else:      self.__log = log        self.__userState = hodState(self.__cfg['hod']['user_state'])         self.__clusterState = None    self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }        # Allowing to pass in log object to help testing - a stib can be passed in    if cluster is None:      self.__cluster = hadoopCluster(self.__cfg, self.__log)    else:      self.__cluster = cluster    def __setup_logger(self):    self.__baseLogger = hodlib.Common.logger.hodLog('hod')    self.__log = self.__baseLogger.add_logger(self.__user )     if self.__cfg['hod']['stream']:      self.__baseLogger.add_stream(level=self.__cfg['hod']['debug'],                             addToLoggerNames=(self.__user ,))      if self.__cfg['hod'].has_key('syslog-address'):      self.__baseLogger.add_syslog(self.__cfg['hod']['syslog-address'],                                    level=self.__cfg['hod']['debug'],                                    addToLoggerNames=(self.__user ,))  def get_logger(self):    return self.__log  def __setup_cluster_logger(self, directory):    self.__baseLogger.add_file(logDirectory=directory, level=4,                                addToLoggerNames=(self.__user ,))  def __setup_cluster_state(self, directory):    self.__clusterState = hodState(directory)  def __norm_cluster_dir(self, directory):    directory = os.path.expanduser(directory)    if not os.path.isabs(directory):      directory = os.path.join(self.__cfg['hod']['original-dir'], directory)    directory = os.path.abspath(directory)        return directory    def __setup_service_registry(self):    cfg = self.__cfg['hod'].copy()    cfg['debug'] = 0    self.__registry = svcrgy(cfg, self.__log)    self.__registry.start()    self.__log.debug(self.__registry.getXMLRPCAddr())    self.__cfg['hod']['xrs-address'] = self.__registry.getXMLRPCAddr()    self.__cfg['ringmaster']['svcrgy-addr'] = self.__cfg['hod']['xrs-address']  def __set_cluster_state_info(self, env, hdfs, mapred, ring, jobid, min, max):    self.__clusterStateInfo['env'] = env    self.__clusterStateInfo['hdfs'] = "http://%s" % hdfs    self.__clusterStateInfo['mapred'] = "http://%s" % mapred    self.__clusterStateInfo['ring'] = ring    self.__clusterStateInfo['jobid'] = jobid    self.__clusterStateInfo['min'] = min    self.__clusterStateInfo['max'] = max      def __set_user_state_info(self, info):    userState = self.__userState.read(CLUSTER_DATA_FILE)    for key in info.keys():      userState[key] = info[key]          self.__userState.write(CLUSTER_DATA_FILE, userState)    def __remove_cluster(self, clusterDir):    clusterInfo = self.__userState.read(CLUSTER_DATA_FILE)    if clusterDir in clusterInfo:      del(clusterInfo[clusterDir])      self.__userState.write(CLUSTER_DATA_FILE, clusterInfo)        def __cleanup(self):    if self.__registry: self.__registry.stop()      def __check_operation(self, operation):        opList = operation.split()        if not opList[0] in self.__ops:      self.__log.critical("Invalid hod operation specified: %s" % operation)      self._op_help(None)      self.__opCode = 2             return opList     def __adjustMasterFailureCountConfig(self, nodeCount):    # This method adjusts the ringmaster.max-master-failures variable    # to a value that is bounded by the a function of the number of    # nodes.    maxFailures = self.__cfg['ringmaster']['max-master-failures']    # Count number of masters required - depends on which services    # are external    masters = 0    if not self.__cfg['gridservice-hdfs']['external']:      masters += 1    if not self.__cfg['gridservice-mapred']['external']:      masters += 1    # So, if there are n nodes and m masters, we look atleast for    # all masters to come up. Therefore, atleast m nodes should be    # good, which means a maximum of n-m master nodes can fail.    maxFailedNodes = nodeCount - masters    # The configured max number of failures is now bounded by this    # number.    self.__cfg['ringmaster']['max-master-failures'] = \                              min(maxFailures, maxFailedNodes)      def _op_allocate(self, args):    operation = "allocate"    argLength = len(args)    min = 0    max = 0    errorFlag = False    errorMsgs = []    if argLength == 3:      nodes = args[2]      clusterDir = self.__norm_cluster_dir(args[1])      if not os.path.exists(clusterDir):        try:          os.makedirs(clusterDir)        except OSError, err:          errorFlag = True          errorMsgs.append("Could not create cluster directory. %s" \                            % (str(err)))      elif not os.path.isdir(clusterDir):        errorFlag = True        errorMsgs.append( \                    "Invalid cluster directory (--hod.clusterdir or -d) : " + \                         clusterDir + " : Not a directory")              if int(nodes) < 3 :        errorFlag = True        errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \                         "Must be >= 3. Given nodes: %s" % nodes)      if errorFlag:        for msg in errorMsgs:          self.__log.critical(msg)        self.__opCode = 3        return      if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, \                                              (os.R_OK, os.W_OK)):        self.__log.critical(INVALID_STATE_FILE_MSGS[2] % \                         self.__userState.get_state_file())        self.__opCode = 1        return      clusterList = self.__userState.read(CLUSTER_DATA_FILE)      if clusterDir in clusterList.keys():        self.__setup_cluster_state(clusterDir)        clusterInfo = self.__clusterState.read()        # Check if the job is not running. Only then can we safely        # allocate another cluster. Otherwise the user would need        # to deallocate and free up resources himself.        if clusterInfo.has_key('jobid') and \            self.__cluster.is_cluster_deallocated(clusterInfo['jobid']):          self.__log.warn("Found a dead cluster at cluster directory '%s'. Deallocating it to allocate a new one." % (clusterDir))          self.__remove_cluster(clusterDir)          self.__clusterState.clear()        else:          self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. HOD cannot determine if this cluster can be automatically deallocated. Deallocate the cluster if it is unused." % (clusterDir))          self.__opCode = 12          return       self.__setup_cluster_logger(clusterDir)      if re.match('\d+-\d+', nodes):        (min, max) = nodes.split("-")        min = int(min)        max = int(max)      else:        try:          nodes = int(nodes)          min = nodes          max = nodes        except ValueError:          print self.__hodhelp.help(operation)          self.__log.critical(          "%s operation requires a pos_int value for n(nodecount)." %           operation)          self.__opCode = 3        else:          self.__setup_cluster_state(clusterDir)          clusterInfo = self.__clusterState.read()          self.__opCode = self.__cluster.check_cluster(clusterInfo)          if self.__opCode == 0 or self.__opCode == 15:            self.__setup_service_registry()               if hodInterrupt.isSet():               self.__cleanup()              raise HodInterruptException()            self.__log.debug("Service Registry started.")            self.__adjustMasterFailureCountConfig(nodes)                        try:              allocateStatus = self.__cluster.allocate(clusterDir, min, max)                except HodInterruptException, h:              self.__cleanup()              raise h            # Allocation has gone through.            # Don't care about interrupts any more            try:              if allocateStatus == 0:                self.__set_cluster_state_info(os.environ,                                               self.__cluster.hdfsInfo,                                               self.__cluster.mapredInfo,                                               self.__cluster.ringmasterXRS,                                              self.__cluster.jobId,                                              min, max)                self.__setup_cluster_state(clusterDir)                self.__clusterState.write(self.__cluster.jobId,                                           self.__clusterStateInfo)                #  Do we need to check for interrupts here ??                  self.__set_user_state_info(                   { clusterDir : self.__cluster.jobId, } )              self.__opCode = allocateStatus            except Exception, e:              # Some unknown problem.              self.__cleanup()              self.__cluster.deallocate(clusterDir, self.__clusterStateInfo)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -