⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 torque.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License."""Maui/Torque implementation of NodePool"""# -*- python -*-import os, sys, csv, socket, time, re, pprintfrom hodlib.Hod.nodePool import *from hodlib.Schedulers.torque import torqueInterfacefrom hodlib.Common.threads import simpleCommandfrom hodlib.Common.util import get_exception_string, args_to_string, local_fqdn, \                        TORQUE_USER_LIMITS_COMMENT_FIELDclass TorqueNodeSet(NodeSet):  def __init__(self, id, numNodes, preferredList, isPreemptee):    NodeSet.__init__(self, id, numNodes, preferredList, isPreemptee)    self.qsubId = None    self.addrList = []  def _setQsubId(self, qsubId):    self.qsubId = qsubId  def _setAddrList(self, addrList):    self.addrList = addrList  def getAddrList(self):    return self.addrListclass TorquePool(NodePool):  def __init__(self, nodePoolDesc, cfg, log):    NodePool.__init__(self, nodePoolDesc, cfg, log)    environ = os.environ.copy()        if self._cfg['resource_manager'].has_key('pbs-server'):      environ['PBS_DEFAULT'] = self._cfg['resource_manager']['pbs-server']    self.__torque = torqueInterface(      self._cfg['resource_manager']['batch-home'], environ, self._log)  def __gen_submit_params(self, nodeSet, walltime = None, qosLevel = None,                           account = None):    argList = []    stdinList = []        npd = self.nodePoolDesc        def gen_stdin_list():      # Here we are basically generating the standard input for qsub.      #  Specifically a script to exec ringmaster.      stdinList.append('#!/bin/sh')            ringBin = os.path.join(self._cfg['hod']['base-dir'], 'bin',                              'ringmaster')      ringArgs = [ringBin,]      ringArgs.extend(self._cfg.get_args(exclude=('hod')))            ringMasterCommand = args_to_string(ringArgs)            self._log.debug("ringmaster cmd: %s" % ringMasterCommand)            stdinList.append(ringMasterCommand)          def gen_arg_list():            def process_qsub_attributes():        rawAttributes = self.nodePoolDesc.getAttrs()            # 'W:x' is used to specify torque management extentensions ie -W x= ...        resourceManagementExtensions = ''        if 'W:x' in rawAttributes:          resourceManagementExtensions = rawAttributes['W:x']            if qosLevel:          if len(resourceManagementExtensions) > 0:            resourceManagementExtensions += ';'          resourceManagementExtensions += 'QOS:%s' % (qosLevel)            rawAttributes['W:x'] = resourceManagementExtensions                hostname = local_fqdn()            rawAttributes['l:nodes'] = nodeSet._getNumNodes()                if walltime:          rawAttributes['l:walltime'] = walltime                #create a dict of dictionaries for         # various arguments of torque        cmds = {}        for key in rawAttributes:          value = rawAttributes[key]              if key.find(':') == -1:            raise ValueError, 'Syntax error: missing colon after %s in %s=%s' % (              key, key, value)              [option, subOption] = key.split(':', 1)          if not option in cmds:            cmds[option] = {}          cmds[option][subOption] = value                opts = []        #create a string from this        #dictionary of dictionaries createde above        for k in cmds:          csv = []          nv = cmds[k]          for n in nv:            v = nv[n]            if len(n) == 0:              csv.append(v)            else:              csv.append('%s=%s' % (n, v))          opts.append('-%s' % (k))          opts.append(','.join(csv))            for option in cmds:          commandList = []          for subOption in cmds[option]:            value = cmds[option][subOption]            if len(subOption) == 0:                commandList.append(value)            else:                commandList.append("%s=%s" % (subOption, value))          opts.append('-%s' % option)          opts.append(','.join(commandList))                  return opts            pkgdir = npd.getPkgDir()        qsub = os.path.join(pkgdir, 'bin', 'qsub')      sdd = self._cfg['servicedesc']            gsvc = None      for key in sdd:        gsvc = sdd[key]        break            argList.extend(process_qsub_attributes())      argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))      argList.extend(('-r','n'))      if 'pbs-user' in self._cfg['resource_manager']:        argList.extend(('-u', self._cfg['resource_manager']['pbs-user']))        argList.extend(('-d','/tmp/'))      if 'queue' in self._cfg['resource_manager']:        queue = self._cfg['resource_manager']['queue']        argList.extend(('-q',queue))        # In HOD 0.4, we pass in an account string only if it is mentioned.      # Also, we don't append userid to the account string, as HOD jobs run as the       # user running them, not as 'HOD' user.      if self._cfg['resource_manager'].has_key('pbs-account'):        argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))          if 'env-vars' in self._cfg['resource_manager']:        qsub_envs = self._cfg['resource_manager']['env-vars']        argList.extend(('-v', self.__keyValToString(qsub_envs)))    gen_arg_list()    gen_stdin_list()        return argList, stdinList      def __keyValToString(self, keyValList):    ret = ""    for key in keyValList:      ret = "%s%s=%s," % (ret, key, keyValList[key])    return ret[:-1]    def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):    if not id:      id = self.getNextNodeSetId()        nodeSet = TorqueNodeSet(id, numNodes, preferred, isPreemptee)    self.nodeSetDict[nodeSet.getId()] = nodeSet        return nodeSet        def submitNodeSet(self, nodeSet, walltime = None, qosLevel = None,                     account = None):    argList, stdinList = self.__gen_submit_params(nodeSet, walltime, qosLevel,                                                   account)        jobId, exitCode = self.__torque.qsub(argList, stdinList)        ## UNUSED CODE: LINE ##    nodeSet.qsubId = jobId    return jobId, exitCode  def freeNodeSet(self, nodeSet):        exitCode = self.deleteJob(nodeSet.getId())        del self.nodeSetDict[nodeSet.getId()]      return exitCode    def finalize(self):    status = 0    exitCode = 0    for nodeSet in self.nodeSetDict.values():      exitCode = self.freeNodeSet(nodeSet)          if exitCode > 0 and exitCode != 153:      status = 4          return status      ## UNUSED METHOD ?? ##  def getWorkers(self):    hosts = []        qstatInfo = self.__torque(self.getServiceId())    if qstatInfo:      hosts = qstatInfop['exec_host']        return hosts   ## UNUSED METHOD ?? ##  def pollNodeSet(self, nodeSet):    status = NodeSet.COMPLETE      nodeSet = self.nodeSetDict[0]     qstatInfo = self.__torque(self.getServiceId())    if qstatMap:          jobstate = qstatMap['job_state']      exechost = qstatMap['exec_host']    if jobstate == 'Q':      status = NodeSet.PENDING    elif exechost == None:      status = NodeSet.COMMITTED    else:      nodeSet._setAddrList(exec_host)    return status          def getServiceId(self):    id = None        nodeSets = self.nodeSetDict.values()    if len(nodeSets):      id = nodeSets[0].qsubId          if id == None:      id = os.getenv('PBS_JOBID')          return id  def getJobInfo(self, jobId=None):    #torque error code when credentials fail, a temporary condition sometimes.    credFailureErrorCode = 171    jobNonExistentErrorCode = 153    credFailureRetries = 10    i = 0    self.__jobInfo = None        if jobId == None:      jobId = self.getServiceId()    while i < credFailureRetries:      qstatInfo, exitCode = self.__torque.qstat(jobId)      if exitCode == 0:        self.__jobInfo = qstatInfo        break      elif exitCode == jobNonExistentErrorCode:        # This really means that the job completed        # However, setting only job_state for now, not         # any other attributes, as none seem required.        self.__jobInfo = { 'job_state' : 'C' }        break      else:        if exitCode == credFailureErrorCode:          time.sleep(1)          i = i+1        else:          break    return self.__jobInfo  def deleteJob(self, jobId):    exitCode = self.__torque.qdel(jobId)    return exitCode  def isJobFeasible(self):    comment = None    msg = None    if self.__jobInfo.has_key('comment'):      comment = self.__jobInfo['comment']    try:      if comment:        commentField = re.compile(self._cfg['hod']['job-feasibility-attr'])        match = commentField.search(comment)        if match:          reqUsage = int(match.group(1))          currentUsage = int(match.group(2))          maxUsage = int(match.group(3))          msg = "Current Usage:%s, Requested:%s, Maximum Limit:%s " % \                                  (currentUsage, reqUsage, maxUsage)          if reqUsage > maxUsage:            return "Never", msg          if reqUsage + currentUsage > maxUsage:            return False, msg    except Exception, e:      self._log.error("Error in isJobFeasible : %s" %e)      raise Exception(e)    return True, msg      def runWorkers(self, args):    return self.__torque.pbsdsh(args)  def updateWorkerInfo(self, workerInfoMap, jobId):    workerInfoStr = ''    for key in workerInfoMap.keys():      workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])    exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)    return exitCode

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -