📄 hodring.py
字号:
doc = implementation.createDocument('', 'configuration', None) comment = doc.createComment("This is an auto generated hadoop-site.xml, do not modify") topElement = doc.documentElement topElement.appendChild(comment) finalAttr = self.desc.getfinalAttrs() self.createXML(doc, finalAttr, topElement, True) attr = {} attr1 = self.desc.getAttrs() for k,v in attr1.iteritems(): if not finalAttr.has_key(k): attr[k] = v self.createXML(doc, attr, topElement, False) siteName = os.path.join(self.confdir, "hadoop-site.xml") sitefile = file(siteName, 'w') print >> sitefile, topElement.toxml() sitefile.close() self.log.debug('created %s' % (siteName)) def _createHadoopLogDir(self): if self.restart: if not os.path.exists(self.logdir): os.makedirs(self.logdir) else: assert os.path.exists(self.logdir) == False os.makedirs(self.logdir) def _createXmlElement(self, doc, name, value, description, final): prop = doc.createElement("property") nameP = doc.createElement("name") string = doc.createTextNode(name) nameP.appendChild(string) valueP = doc.createElement("value") string = doc.createTextNode(value) valueP.appendChild(string) desc = doc.createElement("description") string = doc.createTextNode(description) desc.appendChild(string) prop.appendChild(nameP) prop.appendChild(valueP) prop.appendChild(desc) if (final): felement = doc.createElement("final") string = doc.createTextNode("true") felement.appendChild(string) prop.appendChild(felement) pass return prop def getMRSystemDirectoryManager(self): return MRSystemDirectoryManager(self.__hadoopThread.getPid(), self.__mrSysDir, \ self.desc.getfinalAttrs()['fs.default.name'], \ self.path, self.log) def run(self, dir): status = True args = [] desc = self.desc self.log.debug(pprint.pformat(desc.dict)) self.log.debug("Got package dir of %s" % dir) self.path = os.path.join(dir, self.program) self.log.debug("path: %s" % self.path) args.append(self.path) args.extend(desc.getArgv()) envs = desc.getEnvs() fenvs = os.environ for k, v in envs.iteritems(): fenvs[k] = v if envs.has_key('HADOOP_OPTS'): fenvs['HADOOP_OPTS'] = envs['HADOOP_OPTS'] self.log.debug("HADOOP_OPTS : %s" % fenvs['HADOOP_OPTS']) fenvs['JAVA_HOME'] = self.javahome fenvs['HADOOP_CONF_DIR'] = self.confdir fenvs['HADOOP_LOG_DIR'] = self.logdir self.log.info(pprint.pformat(fenvs)) hadoopCommand = '' for item in args: hadoopCommand = "%s%s " % (hadoopCommand, item) # Redirecting output and error to self.out and self.err hadoopCommand = hadoopCommand + ' 1>%s 2>%s ' % (self.out, self.err) self.log.debug('running command: %s' % (hadoopCommand)) self.log.debug('hadoop env: %s' % fenvs) self.log.debug('Command stdout will be redirected to %s ' % self.out + \ 'and command stderr to %s' % self.err) self.__hadoopThread = simpleCommand('hadoop', hadoopCommand, env=fenvs) self.__hadoopThread.start() while self.__hadoopThread.stdin == None: time.sleep(.2) self.log.debug("hadoopThread still == None ...") input = desc.getStdin() self.log.debug("hadoop input: %s" % input) if input: if self.__hadoopThread.is_running(): print >>self.__hadoopThread.stdin, input else: self.log.error("hadoop command failed to start") self.__hadoopThread.stdin.close() self.log.debug("isForground: %s" % desc.isForeground()) if desc.isForeground(): self.log.debug("Waiting on hadoop to finish...") self.__hadoopThread.wait() self.log.debug("Joining hadoop thread...") self.__hadoopThread.join() if self.__hadoopThread.exit_code() != 0: status = False else: status = self.getCommandStatus() self.log.debug("hadoop run status: %s" % status) if status == False: self.handleFailedCommand() if (status == True) or (not desc.isIgnoreFailures()): return status else: self.log.error("Ignoring Failure") return True def kill(self): self.__hadoopThread.kill() if self.__hadoopThread: self.__hadoopThread.join() def addCleanup(self, list): list.extend(self.workdirs) list.append(self.confdir) def getCommandStatus(self): status = True ec = self.__hadoopThread.exit_code() if (ec != 0) and (ec != None): status = False return status def handleFailedCommand(self): self.log.error('hadoop error: %s' % ( self.__hadoopThread.exit_status_string())) # read the contents of redirected stderr to print information back to user if os.path.exists(self.err): f = None try: f = open(self.err) lines = f.readlines() # format for line in lines: self.stdErrContents = "%s%s" % (self.stdErrContents, line) finally: if f is not None: f.close() self.log.error('See %s.out and/or %s.err for details. They are ' % \ (self.name, self.name) + \ 'located at subdirectories under either ' + \ 'hodring.work-dirs or hodring.log-destination-uri.')class HodRing(hodBaseService): """The main class for hodring that polls the commands it runs""" def __init__(self, config): hodBaseService.__init__(self, 'hodring', config['hodring']) self.log = self.logs['main'] self._http = None self.__pkg = None self.__pkgDir = None self.__tempDir = None self.__running = {} self.__hadoopLogDirs = [] self.__init_temp_dir() def __init_temp_dir(self): self.__tempDir = os.path.join(self._cfg['temp-dir'], "%s.%s.hodring" % (self._cfg['userid'], self._cfg['service-id'])) if not os.path.exists(self.__tempDir): os.makedirs(self.__tempDir) os.chdir(self.__tempDir) def __fetch(self, url, spath): retry = 3 success = False while (retry != 0 and success != True): try: input = urllib.urlopen(url) bufsz = 81920 buf = input.read(bufsz) out = open(spath, 'w') while len(buf) > 0: out.write(buf) buf = input.read(bufsz) input.close() out.close() success = True except: self.log.debug("Failed to copy file") retry = retry - 1 if (retry == 0 and success != True): raise IOError, "Failed to copy the files" def __get_name(self, addr): parsedUrl = urlparse(addr) path = parsedUrl[2] split = path.split('/', 1) return split[1] def __get_dir(self, name): """Return the root directory inside the tarball specified by name. Assumes that the tarball begins with a root directory.""" import tarfile myTarFile = tarfile.open(name) hadoopPackage = myTarFile.getnames()[0] self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) return hadoopPackage def getRunningValues(self): return self.__running.values() def getTempDir(self): return self.__tempDir def getHadoopLogDirs(self): return self.__hadoopLogDirs def __download_package(self, ringClient): self.log.debug("Found download address: %s" % self._cfg['download-addr']) try: addr = 'none' downloadTime = self._cfg['tarball-retry-initial-time'] # download time depends on tarball size and network bandwidth increment = 0 addr = ringClient.getTarList(self.hostname) while(addr == 'none'): rand = self._cfg['tarball-retry-initial-time'] + increment + \ random.uniform(0,self._cfg['tarball-retry-interval']) increment = increment + 1 self.log.debug("got no tarball. Retrying again in %s seconds." % rand) time.sleep(rand) addr = ringClient.getTarList(self.hostname) self.log.debug("got this address %s" % addr) tarName = self.__get_name(addr) self.log.debug("tar package name: %s" % tarName) fetchPath = os.path.join(os.getcwd(), tarName) self.log.debug("fetch path: %s" % fetchPath) self.__fetch(addr, fetchPath) self.log.debug("done fetching") tarUrl = "http://%s:%d/%s" % (self._http.server_address[0], self._http.server_address[1], tarName) try: ringClient.registerTarSource(self.hostname, tarUrl,addr) #ringClient.tarDone(addr) except KeyError, e: self.log.error("registerTarSource and tarDone failed: ", e) raise KeyError(e) check = untar(fetchPath, os.getcwd()) if (check == False): raise IOError, "Untarring failed." self.__pkg = self.__get_dir(tarName) self.__pkgDir = os.path.join(os.getcwd(), self.__pkg) except Exception, e: self.log.error("Failed download tar package: %s" % get_exception_error_string()) raise Exception(e) def __run_hadoop_commands(self, restart=True): id = 0 for desc in self._cfg['commanddesc']: self.log.debug(pprint.pformat(desc.dict)) mrSysDir = getMapredSystemDirectory(self._cfg['mapred-system-dir-root'], self._cfg['userid'], self._cfg['service-id']) self.log.debug('mrsysdir is %s' % mrSysDir) cmd = HadoopCommand(id, desc, self.__tempDir, self.__pkgDir, self.log, self._cfg['java-home'], mrSysDir, restart) self.__hadoopLogDirs.append(cmd.logdir) self.log.debug("hadoop log directory: %s" % self.__hadoopLogDirs)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -