📄 ringmaster.py
字号:
cfg['hodring']['download-addr'] = self.tarAddress self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources) def __init_job_tracker_monitor(self, logMasterSources): hadoopDir = self.__getHadoopDir() self.log.debug('hadoopdir=%s, java-home=%s' % \ (hadoopDir, self.cfg['hodring']['java-home'])) try: self.__jtMonitor = JobTrackerMonitor(self.log, self, self.cfg['ringmaster']['jt-poll-interval'], self.cfg['ringmaster']['idleness-limit'], hadoopDir, self.cfg['hodring']['java-home'], logMasterSources) self.log.debug('starting jt monitor') self.__jtMonitor.start() except: self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\ Exception message: %s' % get_exception_error_string()) self.log.debug('Exception details: %s' % get_exception_string()) def __getHadoopDir(self): hadoopDir = None if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): tarFile = os.path.join(os.getcwd(), self.basename) ret = untar(tarFile, os.getcwd()) if not ret: raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \ % (tarFile, os.getcwd())) hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile)) else: hadoopDir = self.cfg['gridservice-mapred']['pkgs'] self.log.debug('Returning Hadoop directory as: %s' % hadoopDir) return hadoopDir def __get_dir(self, name): """Return the root directory inside the tarball specified by name. Assumes that the tarball begins with a root directory.""" import tarfile myTarFile = tarfile.open(name) hadoopPackage = myTarFile.getnames()[0] self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) return hadoopPackage def __find_tarball_in_dir(self, dir): """Find the tarball among files specified in the given directory. We need this method because how the tarball source URI is given depends on the method of copy and we can't get the tarball name from that. This method will fail if there are multiple tarballs in the directory with the same suffix.""" files = os.listdir(dir) for file in files: if self.tarSrcLoc.endswith(file): return file return None def __copy_tarball(self, destDir): """Copy the hadoop tar ball from a remote location to the specified destination directory. Based on the URL it executes an appropriate copy command. Throws an exception if the command returns a non-zero exit code.""" # for backwards compatibility, treat the default case as file:// url = '' if self.tarSrcLoc.startswith('/'): url = 'file:/' src = '%s%s' % (url, self.tarSrcLoc) if src.startswith('file://'): src = src[len('file://')-1:] cpCmd = '/bin/cp' cmd = '%s %s %s' % (cpCmd, src, destDir) self.log.debug('Command to execute: %s' % cmd) copyProc = simpleCommand('remote copy', cmd) copyProc.start() copyProc.wait() copyProc.join() ret = copyProc.exit_code() self.log.debug('Completed command execution. Exit Code: %s.' % ret) if ret != 0: output = copyProc.output() raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' % (cmd, ret, output)) else: raise Exception('Unsupported URL for file: %s' % src)# input: http://hostname:port/. output: [hostname,port] def __url_to_addr(self, url): addr = url.rstrip('/') if addr.startswith('http://'): addr = addr.replace('http://', '', 1) addr_parts = addr.split(':') return [addr_parts[0], int(addr_parts[1])] def __initialize_signal_handlers(self): def sigStop(sigNum, handler): sig_wrapper(sigNum, self.stop) signal.signal(signal.SIGTERM, sigStop) signal.signal(signal.SIGINT, sigStop) signal.signal(signal.SIGQUIT, sigStop) def __clean_up(self): tempDir = self.__get_tempdir() os.chdir(os.path.split(tempDir)[0]) if os.path.exists(tempDir): shutil.rmtree(tempDir, True) self.log.debug("Cleaned up temporary dir: %s" % tempDir) def __get_tempdir(self): dir = os.path.join(self.cfg['ringmaster']['temp-dir'], "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], self.np.getServiceId())) return dir def getWorkDirs(self, cfg, reUse=False): if (not reUse) or (self.workDirs == None): import math frand = random.random() while math.ceil(frand) != math.floor(frand): frand = frand * 100 irand = int(frand) uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand) dirs = [] parentDirs = cfg['ringmaster']['work-dirs'] for p in parentDirs: dir = os.path.join(p, uniq) dirs.append(dir) self.workDirs = dirs return self.workDirs def _fetchLink(self, link, parentDir): parser = miniHTMLParser() self.log.debug("Checking link %s" %link) while link: # Get the file from the site and link input = urllib.urlopen(link) out = None contentType = input.info().gettype() isHtml = contentType == 'text/html' #print contentType if isHtml: parser.setBaseUrl(input.geturl()) else: parsed = urlparse.urlparse(link) hp = parsed[1] h = hp p = None if hp.find(':') != -1: h, p = hp.split(':', 1) path = parsed[2] path = path.split('/') file = os.path.join(parentDir, h, p) for c in path: if c == '': continue file = os.path.join(file, c) try: self.log.debug('Creating %s' % file) dir, tail = os.path.split(file) if not os.path.exists(dir): os.makedirs(dir) except: self.log.debug(get_exception_string()) out = open(file, 'w') bufSz = 8192 buf = input.read(bufSz) while len(buf) > 0: if isHtml: # Feed the file into the HTML parser parser.feed(buf) if out: out.write(buf) buf = input.read(bufSz) input.close() if out: out.close() # Search the retfile here # Get the next link in level traversal order link = parser.getNextLink() parser.close() def _finalize(self): try: # FIXME: get dir from config dir = 'HOD-log-P%d' % (os.getpid()) dir = os.path.join('.', dir) except: self.log.debug(get_exception_string()) self.np.finalize() def handleIdleJobTracker(self): self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \ % self.cfg['ringmaster']['idleness-limit']) self.__idlenessDetected = True def cd_to_tempdir(self): dir = self.__get_tempdir() if not os.path.exists(dir): os.makedirs(dir) os.chdir(dir) return dir def getWorkload(self): return self.workload def getHostName(self): return self.__hostname def start(self): """run the thread main loop""" self.log.debug("Entered start method.") hodring = os.path.join(self.cfg['ringmaster']['base-dir'], 'bin', 'hodring') largs = [hodring] targs = self.cfg.get_args(section='hodring') largs.extend(targs) hodringCmd = "" for item in largs: hodringCmd = "%s%s " % (hodringCmd, item) self.log.debug(hodringCmd) if self.np.runWorkers(largs) > 0: self.log.critical("Failed to start worker.") self.log.debug("Returned from runWorkers.") self._finalize() def __findExitCode(self): """Determine the exit code based on the status of the cluster or jobs run on them""" xmlrpcServer = ringMasterServer.instance.logMasterSources if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \ xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "): self.__exitCode = 7 elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \ xmlrpcServer.getServiceAddr('mapred').startswith("Error: "): self.__exitCode = 8 else: clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'), xmlrpcServer.getServiceAddr('mapred')) if clusterStatus != 0: self.__exitCode = clusterStatus else: self.__exitCode = self.__findHadoopJobsExitCode() self.log.debug('exit code %s' % self.__exitCode) def __findHadoopJobsExitCode(self): """Determine the consolidate exit code of hadoop jobs run on this cluster, provided this information is available. Return 0 otherwise""" ret = 0 failureStatus = 3 failureCount = 0 if self.__jtMonitor: jobStatusList = self.__jtMonitor.getJobsStatus() try: if len(jobStatusList) > 0: for jobStatus in jobStatusList: self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), jobStatus.getStatus())) if jobStatus.getStatus() == failureStatus: failureCount = failureCount+1 if failureCount > 0: if failureCount == len(jobStatusList): # all jobs failed ret = 16 else: ret = 17 except: self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string()) return ret def stop(self): self.log.debug("RingMaster stop method invoked.") if self.__stopInProgress or self.__isStopped: return self.__stopInProgress = True if ringMasterServer.instance is not None: self.log.debug('finding exit code') self.__findExitCode() self.log.debug('stopping ringmaster instance') ringMasterServer.stopService() else: self.__exitCode = 6 if self.__jtMonitor is not None: self.__jtMonitor.stop() if self.httpServer: self.httpServer.stop() self.__clean_up() self.__isStopped = True def shouldStop(self): """Indicates whether the main loop should exit, either due to idleness condition, or a stop signal was received""" return self.__idlenessDetected or self.__isStopped def getExitCode(self): """return the exit code of the program""" return self.__exitCodedef main(cfg,log): try: rm = None dGen = DescGenerator(cfg) cfg = dGen.initializeDesc() rm = RingMaster(cfg, log) rm.start() while not rm.shouldStop(): time.sleep(1) rm.stop() log.debug('returning from main') return rm.getExitCode() except Exception, e: if log: log.critical(get_exception_string()) raise Exception(e)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -