⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ringmaster.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
      cfg['hodring']['download-addr'] = self.tarAddress     self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources)  def __init_job_tracker_monitor(self, logMasterSources):    hadoopDir = self.__getHadoopDir()    self.log.debug('hadoopdir=%s, java-home=%s' % \                (hadoopDir, self.cfg['hodring']['java-home']))    try:      self.__jtMonitor = JobTrackerMonitor(self.log, self,                             self.cfg['ringmaster']['jt-poll-interval'],                             self.cfg['ringmaster']['idleness-limit'],                            hadoopDir, self.cfg['hodring']['java-home'],                            logMasterSources)      self.log.debug('starting jt monitor')      self.__jtMonitor.start()    except:      self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\                          Exception message: %s' % get_exception_error_string())      self.log.debug('Exception details: %s' % get_exception_string())  def __getHadoopDir(self):    hadoopDir = None    if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):      tarFile = os.path.join(os.getcwd(), self.basename)      ret = untar(tarFile, os.getcwd())      if not ret:        raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \                            % (tarFile, os.getcwd()))      hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile))    else:      hadoopDir = self.cfg['gridservice-mapred']['pkgs']    self.log.debug('Returning Hadoop directory as: %s' % hadoopDir)    return hadoopDir  def __get_dir(self, name):    """Return the root directory inside the tarball    specified by name. Assumes that the tarball begins    with a root directory."""    import tarfile    myTarFile = tarfile.open(name)    hadoopPackage = myTarFile.getnames()[0]    self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))    return hadoopPackage  def __find_tarball_in_dir(self, dir):    """Find the tarball among files specified in the given     directory. We need this method because how the tarball    source URI is given depends on the method of copy and    we can't get the tarball name from that.    This method will fail if there are multiple tarballs    in the directory with the same suffix."""    files = os.listdir(dir)    for file in files:      if self.tarSrcLoc.endswith(file):        return file    return None  def __copy_tarball(self, destDir):    """Copy the hadoop tar ball from a remote location to the    specified destination directory. Based on the URL it executes    an appropriate copy command. Throws an exception if the command    returns a non-zero exit code."""    # for backwards compatibility, treat the default case as file://    url = ''    if self.tarSrcLoc.startswith('/'):      url = 'file:/'    src = '%s%s' % (url, self.tarSrcLoc)    if src.startswith('file://'):      src = src[len('file://')-1:]      cpCmd = '/bin/cp'      cmd = '%s %s %s' % (cpCmd, src, destDir)      self.log.debug('Command to execute: %s' % cmd)      copyProc = simpleCommand('remote copy', cmd)      copyProc.start()      copyProc.wait()      copyProc.join()      ret = copyProc.exit_code()      self.log.debug('Completed command execution. Exit Code: %s.' % ret)      if ret != 0:        output = copyProc.output()        raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s'                         % (cmd, ret, output))    else:      raise Exception('Unsupported URL for file: %s' % src)# input: http://hostname:port/. output: [hostname,port]  def __url_to_addr(self, url):    addr = url.rstrip('/')    if addr.startswith('http://'):      addr = addr.replace('http://', '', 1)    addr_parts = addr.split(':')    return [addr_parts[0], int(addr_parts[1])]  def __initialize_signal_handlers(self):     def sigStop(sigNum, handler):      sig_wrapper(sigNum, self.stop)      signal.signal(signal.SIGTERM, sigStop)    signal.signal(signal.SIGINT, sigStop)    signal.signal(signal.SIGQUIT, sigStop)  def __clean_up(self):    tempDir = self.__get_tempdir()    os.chdir(os.path.split(tempDir)[0])    if os.path.exists(tempDir):      shutil.rmtree(tempDir, True)          self.log.debug("Cleaned up temporary dir: %s" % tempDir)  def __get_tempdir(self):    dir = os.path.join(self.cfg['ringmaster']['temp-dir'],                           "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'],                                                 self.np.getServiceId()))    return dir  def getWorkDirs(self, cfg, reUse=False):    if (not reUse) or (self.workDirs == None):      import math      frand = random.random()      while math.ceil(frand) != math.floor(frand):        frand = frand * 100      irand = int(frand)      uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand)      dirs = []      parentDirs = cfg['ringmaster']['work-dirs']      for p in parentDirs:        dir = os.path.join(p, uniq)        dirs.append(dir)      self.workDirs = dirs    return self.workDirs  def _fetchLink(self, link, parentDir):    parser = miniHTMLParser()    self.log.debug("Checking link %s" %link)    while link:      # Get the file from the site and link      input = urllib.urlopen(link)      out = None      contentType = input.info().gettype()      isHtml = contentType == 'text/html'      #print contentType      if isHtml:        parser.setBaseUrl(input.geturl())      else:        parsed = urlparse.urlparse(link)        hp = parsed[1]        h = hp        p = None        if hp.find(':') != -1:          h, p = hp.split(':', 1)        path = parsed[2]        path = path.split('/')        file = os.path.join(parentDir, h, p)        for c in path:          if c == '':            continue          file = os.path.join(file, c)        try:          self.log.debug('Creating %s' % file)          dir, tail = os.path.split(file)          if not os.path.exists(dir):            os.makedirs(dir)        except:          self.log.debug(get_exception_string())        out = open(file, 'w')      bufSz = 8192      buf = input.read(bufSz)      while len(buf) > 0:        if isHtml:          # Feed the file into the HTML parser          parser.feed(buf)        if out:          out.write(buf)        buf = input.read(bufSz)      input.close()      if out:        out.close()      # Search the retfile here      # Get the next link in level traversal order      link = parser.getNextLink()          parser.close()      def _finalize(self):    try:      # FIXME: get dir from config      dir = 'HOD-log-P%d' % (os.getpid())      dir = os.path.join('.', dir)    except:      self.log.debug(get_exception_string())    self.np.finalize()  def handleIdleJobTracker(self):    self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \                          % self.cfg['ringmaster']['idleness-limit'])    self.__idlenessDetected = True  def cd_to_tempdir(self):    dir = self.__get_tempdir()        if not os.path.exists(dir):      os.makedirs(dir)    os.chdir(dir)        return dir    def getWorkload(self):    return self.workload  def getHostName(self):    return self.__hostname  def start(self):    """run the thread main loop"""        self.log.debug("Entered start method.")    hodring = os.path.join(self.cfg['ringmaster']['base-dir'],                            'bin', 'hodring')    largs = [hodring]    targs = self.cfg.get_args(section='hodring')    largs.extend(targs)         hodringCmd = ""    for item in largs:      hodringCmd = "%s%s " % (hodringCmd, item)          self.log.debug(hodringCmd)        if self.np.runWorkers(largs) > 0:      self.log.critical("Failed to start worker.")        self.log.debug("Returned from runWorkers.")        self._finalize()  def __findExitCode(self):    """Determine the exit code based on the status of the cluster or jobs run on them"""    xmlrpcServer = ringMasterServer.instance.logMasterSources    if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \        xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "):      self.__exitCode = 7    elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \        xmlrpcServer.getServiceAddr('mapred').startswith("Error: "):      self.__exitCode = 8    else:      clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),                                          xmlrpcServer.getServiceAddr('mapred'))      if clusterStatus != 0:        self.__exitCode = clusterStatus      else:        self.__exitCode = self.__findHadoopJobsExitCode()    self.log.debug('exit code %s' % self.__exitCode)  def __findHadoopJobsExitCode(self):    """Determine the consolidate exit code of hadoop jobs run on this cluster, provided       this information is available. Return 0 otherwise"""    ret = 0    failureStatus = 3    failureCount = 0    if self.__jtMonitor:      jobStatusList = self.__jtMonitor.getJobsStatus()      try:        if len(jobStatusList) > 0:          for jobStatus in jobStatusList:            self.log.debug('job status for %s: %s' % (jobStatus.getJobId(),                                                       jobStatus.getStatus()))            if jobStatus.getStatus() == failureStatus:              failureCount = failureCount+1        if failureCount > 0:          if failureCount == len(jobStatusList): # all jobs failed            ret = 16          else:            ret = 17      except:        self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())    return ret  def stop(self):    self.log.debug("RingMaster stop method invoked.")    if self.__stopInProgress or self.__isStopped:      return    self.__stopInProgress = True    if ringMasterServer.instance is not None:      self.log.debug('finding exit code')      self.__findExitCode()      self.log.debug('stopping ringmaster instance')      ringMasterServer.stopService()    else:      self.__exitCode = 6    if self.__jtMonitor is not None:      self.__jtMonitor.stop()    if self.httpServer:      self.httpServer.stop()          self.__clean_up()    self.__isStopped = True  def shouldStop(self):    """Indicates whether the main loop should exit, either due to idleness condition,     or a stop signal was received"""    return self.__idlenessDetected or self.__isStopped  def getExitCode(self):    """return the exit code of the program"""    return self.__exitCodedef main(cfg,log):  try:    rm = None    dGen = DescGenerator(cfg)    cfg = dGen.initializeDesc()    rm = RingMaster(cfg, log)    rm.start()    while not rm.shouldStop():      time.sleep(1)    rm.stop()    log.debug('returning from main')    return rm.getExitCode()  except Exception, e:    if log:      log.critical(get_exception_string())    raise Exception(e)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -