⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 util.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.import errno, sys, os, traceback, stat, socket, re, warnings, signalfrom hodlib.Common.tcp import tcpSocket, tcpError from hodlib.Common.threads import simpleCommandsetUGV   = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }reEscapeSeq = r"\\(.)?"reEscapeSeq = re.compile(reEscapeSeq)HOD_INTERRUPTED_CODE = 127HOD_INTERRUPTED_MESG = "Hod interrupted. Cleaning up and exiting"TORQUE_USER_LIMITS_COMMENT_FIELD = "User-limits exceeded. " + \        "Requested:([0-9]*) Used:([0-9]*) MaxLimit:([0-9]*)"TORQUE_USER_LIMITS_EXCEEDED_MSG = "Requested number of nodes exceeded " + \                                  "maximum user limits. "class AlarmException(Exception):    def __init__(self, msg=''):        self.message = msg        Exception.__init__(self, msg)    def __repr__(self):        return self.messagedef isProcessRunning(pid):    '''Check if a process is running, by sending it a 0 signal, and checking for errors'''    # This method is documented in some email threads on the python mailing list.    # For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html    try:      os.kill(pid, 0)      return True    except OSError, err:      return err.errno == errno.EPERMdef untar(file, targetDir):    status = False    command = 'tar -C %s -zxf %s' % (targetDir, file)    commandObj = simpleCommand('untar', command)    commandObj.start()    commandObj.wait()    commandObj.join()    if commandObj.exit_code() == 0:        status = True            return statusdef tar(tarFile, tarDirectory, tarList):    currentDir = os.getcwd()    os.chdir(tarDirectory)    status = False    command = 'tar -czf %s ' % (tarFile)    for file in tarList:        command = "%s%s " % (command, file)        commandObj = simpleCommand('tar', command)    commandObj.start()    commandObj.wait()    commandObj.join()    if commandObj.exit_code() == 0:        status = True    else:        status = commandObj.exit_status_string()        os.chdir(currentDir)            return status  def to_http_url(list):    """convert [hostname, port]  to a http url"""     str = ''    str = "http://%s:%s" % (list[0], list[1])        return strdef get_exception_string():    (type, value, tb) = sys.exc_info()    exceptList = traceback.format_exception(type, value, tb)    exceptString = ''    for line in exceptList:        exceptString = "%s%s" % (exceptString, line)        return exceptString  def get_exception_error_string():  (type, value, tb) = sys.exc_info()  if value:    exceptString = "%s %s" % (type, value)  else:    exceptString = type      return exceptStringdef check_timestamp(timeStamp):    """ Checks the validity of a timeStamp.        timeStamp - (YYYY-MM-DD HH:MM:SS in UTC)        returns True or False    """    isValid = True    try:        timeStruct = time.strptime(timeStamp, "%Y-%m-%d %H:%M:%S")    except:        isValid = False    return isValiddef sig_wrapper(sigNum, handler, *args):  if args:      handler(args)  else:      handler()      def get_perms(filename):    mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE])    permsString = ''    permSet = 0    place = 2    for who in "USR", "GRP", "OTH":        for what in "R", "W", "X":            if mode & getattr(stat,"S_I"+what+who):                permSet = permSet + 2**place            place = place - 1        permsString = "%s%s" % (permsString, permSet)        permSet = 0        place = 2    permSet = 0    for permFlag in setUGV.keys():        if mode & getattr(stat, permFlag):            permSet = permSet + 2**setUGV[permFlag]    permsString = "%s%s" % (permSet, permsString)    return permsStringdef local_fqdn():    """Return a system's true FQDN rather than any aliases, which are       occasionally returned by socket.gethostname."""    fqdn = None    me = os.uname()[1]    nameInfo=socket.gethostbyname_ex(me)    nameInfo[1].append(nameInfo[0])    for name in nameInfo[1]:        if name.count(".") and name.startswith(me):            fqdn = name    if fqdn == None:        fqdn = me    return(fqdn)  def need_to_allocate(allocated, config, command):    status = True        if allocated.isSet():        status = False    elif re.search("\s*dfs.*$", command) and \        config['gridservice-hdfs']['external']:            status = False    elif config['gridservice-mapred']['external']:            status = False            return status  def filter_warnings():    warnings.filterwarnings('ignore',        message=".*?'with' will become a reserved keyword.*")    def args_to_string(list):  """return a string argument space seperated"""  arg = ''  for item in list:    arg = "%s%s " % (arg, item)  return arg[:-1]def replace_escapes(object):  """ replace any escaped character. e.g \, with , \= with = and so on """  # here object is either a config object or a options object  for section in object._mySections:    for option in object._configDef[section].keys():      if object[section].has_key(option):        if object._configDef[section][option]['type'] == 'keyval':          keyValDict = object[section][option]          object[section][option] = {}          for (key,value) in keyValDict.iteritems():            match = reEscapeSeq.search(value)            if match:              value = reEscapeSeq.sub(r"\1", value)            object[section][option][key] = valuedef hadoopVersion(hadoopDir, java_home, log):  # Determine the version of hadoop being used by executing the   # hadoop version command. Code earlier in idleTracker.py  hadoopVersion = { 'major' : None, 'minor' : None }  hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')  cmd = "%s version" % hadoopPath  log.debug('Executing command %s to find hadoop version' % cmd)  env = os.environ  env['JAVA_HOME'] = java_home  hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)  hadoopVerCmd.start()  hadoopVerCmd.wait()  hadoopVerCmd.join()  if hadoopVerCmd.exit_code() == 0:    verLine = hadoopVerCmd.output()[0]    log.debug('Version from hadoop command: %s' % verLine)    hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")    verMatch = hadoopVerRegExp.match(verLine)    if verMatch != None:      hadoopVersion['major'] = verMatch.group(1)      hadoopVersion['minor'] = verMatch.group(2)  return hadoopVersiondef get_cluster_status(hdfsAddress, mapredAddress):  """Determine the status of the cluster based on socket availability     of HDFS and Map/Reduce."""  status = 0  mapredSocket = tcpSocket(mapredAddress)  try:    mapredSocket.open()    mapredSocket.close()  except tcpError:    status = 14  hdfsSocket = tcpSocket(hdfsAddress)  try:    hdfsSocket.open()    hdfsSocket.close()  except tcpError:    if status > 0:      status = 10    else:      status = 13  return statusdef parseEquals(list):  # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a  # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and   # HodRing/hodring.py. No need for specially treating escaped =. as in \=,  # since all keys are generated by hod and don't contain such anomalies  dict = {}  for elems in list:    splits = elems.split('=')    dict[splits[0]] = splits[1]  return dictdef getMapredSystemDirectory(mrSysDirRoot, userid, jobid):  return os.path.join(mrSysDirRoot, userid, 'mapredsystem', jobid)class HodInterrupt:  def __init__(self):    self.HodInterruptFlag = False    self.log = None  def set_log(self, log):    self.log = log  def init_signals(self):    def sigStop(sigNum, handler):      sig_wrapper(sigNum, self.setFlag)    signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal    signal.signal(signal.SIGQUIT, sigStop) # 3  : Quit program    signal.signal(signal.SIGINT, sigStop)  # 2 ^C : Interrupt program    def sig_wrapper(sigNum, handler, *args):      self.log.critical("Caught signal %s." % sigNum )      if args:          handler(args)      else:          handler()  def setFlag(self, val = True):    self.HodInterruptFlag = val  def isSet(self):    return self.HodInterruptFlagclass HodInterruptException(Exception):  def __init__(self, value = ""):    self.value = value      def __str__(self):    return repr(self.value)hodInterrupt = HodInterrupt()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -