⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 idlejobtracker.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.import os, re, timefrom hodlib.Common.threads import loop, funcfrom hodlib.Common.threads import simpleCommandfrom hodlib.Common.util import get_exception_string, hadoopVersionclass HadoopJobStatus:  """This class represents the status of a single Hadoop job"""    def __init__(self, jobId, status):    self.__jobId = jobId    self.__status = status  def getJobId(self):    return self.__jobId  def getStatus(self):    return self.__statusclass HadoopClientException(Exception):  """This class represents an exception that is raised when we fail in     running the job client."""    def __init__(self, errorCode):    self.errorCode = errorCode  class JobTrackerMonitor:  """This class monitors the JobTracker of an allocated cluster     periodically to detect whether it is idle. If it is found     to be idle for more than a configured limit, it calls back     registered handlers who can act upon the idle cluster."""  def __init__(self, log, idleJTHandler, interval, limit,                      hadoopDir, javaHome, servInfoProvider):    self.__log = log    self.__idlenessLimit = limit    self.__idleJobTrackerHandler = idleJTHandler    self.__hadoopDir = hadoopDir    hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")    #hadoop directory can be from pkgs or a temp location like tarball. Verify once.    if not os.path.exists(hadoopPath):      raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)    self.__javaHome = javaHome    # Note that when this object is created, we don't yet know the JT URL.    # The service info provider will be polled until we get the URL.    self.__serviceInfoProvider = servInfoProvider    self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")    self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")    self.__firstIdleTime = 0    self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }    #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.    if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):      raise Exception('Incompatible Hadoop Version: Cannot check status')    self.__stopFlag = False    self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)    self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,                                  sleep=interval)    self.__jobTrackerURL = None  def start(self):    """This method starts a thread that will determine the JobTracker URL"""    self.__jtURLFinderThread.start()  def stop(self):    self.__log.debug('Joining the monitoring thread.')    self.__stopFlag = True    if self.__jtMonitorThread.isAlive():      self.__jtMonitorThread.join()    self.__log.debug('Joined the monitoring thread.')  def getJobTrackerURL(self):    """This method periodically checks the service info provider for the JT URL"""    self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')    while not self.__stopFlag and not self.__isValidJobTrackerURL():      time.sleep(10)      if not self.__stopFlag:        self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')      else:        break    if self.__isValidJobTrackerURL():      self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)      self.__jtMonitorThread.start()  def monitorJobTracker(self):    """This method is periodically called to monitor the JobTracker of the cluster."""    try:      if self.__isIdle():        if self.__idleJobTrackerHandler:          self.__log.info('Detected cluster as idle. Calling registered callback handler.')          self.__idleJobTrackerHandler.handleIdleJobTracker()    except:      self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())  def getJobsStatus(self):    """This method should return the status of all jobs that are run on the HOD allocated       hadoop cluster"""    jobStatusList = []    try:      hadoop16Version = { 'major' : '0', 'minor' : '16' }      if self.__isCompatibleHadoopVersion(hadoop16Version):        jtStatusCommand = self.__initStatusCommand(option='-list all')        jtStatusCommand.start()        jtStatusCommand.wait()        jtStatusCommand.join()        if jtStatusCommand.exit_code() == 0:          for line in jtStatusCommand.output():            jobStatus = self.__extractJobStatus(line)            if jobStatus is not None:              jobStatusList.append(jobStatus)    except:      self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())    return jobStatusList  def __isValidJobTrackerURL(self):    """This method checks that the passed in URL is not one of the special case strings       returned by the getServiceAddr API"""    return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') \              and (not self.__jobTrackerURL.startswith('Error')))  def __extractJobStatus(self, line):    """This method parses an output line from the job status command and creates       the JobStatus object if there is a match"""    jobStatus = None    line = line.strip()    jsMatch = self.__jobStatusRegExp.match(line)    if jsMatch:      jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))    return jobStatus  def __isIdle(self):    """This method checks if the JobTracker is idle beyond a certain limit."""    jobCount = 0    err = False    try:      jobCount = self.__getJobCount()    except HadoopClientException, hce:      self.__log.debug('HadoopClientException handled in getting job count. \                                      Error code: %s' % hce.errorCode)      err = True    if (jobCount==0) or err:      if self.__firstIdleTime == 0:        #detecting idleness for the first time        self.__firstIdleTime = time.time()      else:        if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):          self.__log.info('Idleness limit crossed for cluster')          return True    else:      # reset idleness time      self.__firstIdleTime = 0          return False  def __getJobCount(self):    """This method executes the hadoop job -list command and parses the output to detect       the number of running jobs."""    # We assume here that the poll interval is small enough to detect running jobs.     # If jobs start and stop within the poll interval, the cluster would be incorrectly     # treated as idle. Hadoop 2266 will provide a better mechanism than this.    jobs = -1    jtStatusCommand = self.__initStatusCommand()    jtStatusCommand.start()    jtStatusCommand.wait()    jtStatusCommand.join()    if jtStatusCommand.exit_code() == 0:      for line in jtStatusCommand.output():        match = self.__jobCountRegExp.match(line)        if match:          jobs = int(match.group(1))    elif jtStatusCommand.exit_code() == 1:      # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets      # to differentiate and give more granular exit codes, we can check for those errors      # corresponding to network errors etc.      raise HadoopClientException(jtStatusCommand.exit_code())    return jobs  def __isCompatibleHadoopVersion(self, expectedVersion):    """This method determines whether the version of hadoop being used is one that        is higher than the expectedVersion.       This can be used for checking if a particular feature is available or not"""    ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)    ret = False      if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \      and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):      ret = True    return ret  def __initStatusCommand(self, option="-list"):    """This method initializes the command to run to check the JT status"""    cmd = None    hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')    cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)    cmdStr = "%s %s" % (cmdStr, option)    self.__log.debug('cmd str %s' % cmdStr)    env = os.environ    env['JAVA_HOME'] = self.__javaHome    cmd = simpleCommand('HadoopStatus', cmdStr, env)    return cmd   

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -