⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 torque.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.import os, pprint, re, timefrom hodlib.Common.threads import simpleCommandfrom hodlib.Common.util import args_to_stringfrom hodlib.Common.logger import hodDummyLoggerreQstatLine = re.compile("^\s*(\w+)\s*=\s*(.*)\s*$")class torqueInterface:  def __init__(self, torqueDir, environment, log=None):    self.__qsub = os.path.join(torqueDir, 'bin', 'qsub')    self.__qdel = os.path.join(torqueDir, 'bin', 'qdel')    self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')    self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')    self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')    self.__qalter = os.path.join(torqueDir, 'bin', 'qalter')    self.__env = environment        self.__log = log    if not self.__log:      self.__log = hodDummyLogger()          def qsub(self, argList, stdinList):    jobID = False    exitCode = 0    qsubCommand = "%s %s" % (self.__qsub, args_to_string(argList))        self.__log.debug("qsub -> %s" % qsubCommand)        qsubProcess = simpleCommand('qsub', qsubCommand, env=self.__env)    qsubProcess.start()        while qsubProcess.stdin == None:      time.sleep(.2)    try:      for line in stdinList:        self.__log.debug("qsub stdin: %s" % line)        print >>qsubProcess.stdin, line      qsubProcess.stdin.close()    except IOError, i:      # If torque's qsub is given invalid params, it fails & returns immediately      # Check for such errors here      # Wait for command execution to finish      qsubProcess.wait()      qsubProcess.join()      output = qsubProcess.output()      if output!=[]:        self.__log.critical("qsub Failure : %s " % output[0].strip())        self.__log.critical("qsub Command : %s" % qsubCommand)      return None, qsubProcess.exit_code()    qsubProcess.wait()    qsubProcess.join()        exitCode = qsubProcess.exit_code()    if exitCode == 0:      buffer = qsubProcess.output()      jobID = buffer[0].rstrip('\n')      self.__log.debug("qsub jobid: %s" % jobID)    else:      self.__log.critical("qsub error: %s" % qsubProcess.exit_status_string())            return jobID, exitCode    def qstat(self, jobID):    qstatInfo = None          qstatCommand = "%s -f -1 %s" % (self.__qstat, jobID)        self.__log.debug(qstatCommand)    qstatProcess = simpleCommand('qstat', qstatCommand, env=self.__env)    qstatProcess.start()    qstatProcess.wait()    qstatProcess.join()        exitCode = qstatProcess.exit_code()    if exitCode > 0:      self.__log.warn('qstat error: %s' % qstatProcess.exit_status_string())    else:      qstatInfo = {}      for line in qstatProcess.output():        line = line.rstrip()        if line.find('=') != -1:          qstatMatch = reQstatLine.match(line)          if qstatMatch:            key = qstatMatch.group(1)            value = qstatMatch.group(2)            qstatInfo[key] = value                if 'exec_host' in qstatInfo:        list = qstatInfo['exec_host'].split('+')        addrList = []                for item in list:          [head, end] = item.split('/', 1)          addrList.append(head)                qstatInfo['exec_host'] = addrList            return qstatInfo, exitCode    def pbs_nodes(self, argString):    pass    def qdel(self, jobId, force=False):    exitCode = 0    qdel = self.__qdel    if force:      qdel = "%s -p %s" % (qdel, jobId)    else:      qdel = "%s %s" % (qdel, jobId)     self.__log.debug(qdel)    qdelProcess = simpleCommand('qdel', qdel, env=self.__env)    qdelProcess.start()    qdelProcess.wait()    qdelProcess.join()                exitCode = qdelProcess.exit_code()        return exitCode    def pbsdsh(self, arguments):    status = None        pbsdshCommand = "%s %s" % (self.__pbsdsh, args_to_string(arguments))        self.__log.debug("pbsdsh command: %s" % pbsdshCommand)        pbsdsh = simpleCommand('pbsdsh', pbsdshCommand, env=self.__env)    pbsdsh.start()       for i in range(0, 30):      status = pbsdsh.exit_code()      if status:        self.__log.error("pbsdsh failed: %s" % pbsdsh.exit_status_string())        break          if not status: status = 0          return status    def qalter(self, fieldName, fieldValue, jobId):    """Update the job field with fieldName with the fieldValue.       The fieldValue must be modifiable after the job is submitted."""    # E.g. to alter comment: qalter -W notes='value` jobId    qalterCmd = '%s -W %s=\"%s\" %s' % (self.__qalter, fieldName, fieldValue, jobId)     self.__log.debug("qalter command: %s" % qalterCmd)    qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env)    qalterProcess.start()    qalterProcess.wait()    qalterProcess.join()    exitCode = qalterProcess.exit_code()    return exitCode

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -