⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hodring.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
    doc = implementation.createDocument('', 'configuration', None)    comment = doc.createComment("This is an auto generated hadoop-site.xml, do not modify")    topElement = doc.documentElement    topElement.appendChild(comment)        finalAttr = self.desc.getfinalAttrs()    self.createXML(doc, finalAttr, topElement, True)    attr = {}    attr1 = self.desc.getAttrs()    for k,v in attr1.iteritems():      if not finalAttr.has_key(k):        attr[k] = v    self.createXML(doc, attr, topElement, False)                      siteName = os.path.join(self.confdir, "hadoop-site.xml")    sitefile = file(siteName, 'w')    print >> sitefile, topElement.toxml()    sitefile.close()    self.log.debug('created %s' % (siteName))  def _createHadoopLogDir(self):    if self.restart:      if not os.path.exists(self.logdir):        os.makedirs(self.logdir)    else:      assert os.path.exists(self.logdir) == False      os.makedirs(self.logdir)  def _createXmlElement(self, doc, name, value, description, final):    prop = doc.createElement("property")    nameP = doc.createElement("name")    string = doc.createTextNode(name)    nameP.appendChild(string)    valueP = doc.createElement("value")    string = doc.createTextNode(value)    valueP.appendChild(string)    desc = doc.createElement("description")    string = doc.createTextNode(description)    desc.appendChild(string)    prop.appendChild(nameP)    prop.appendChild(valueP)    prop.appendChild(desc)    if (final):      felement = doc.createElement("final")      string = doc.createTextNode("true")      felement.appendChild(string)      prop.appendChild(felement)      pass        return prop  def getMRSystemDirectoryManager(self):    return MRSystemDirectoryManager(self.__hadoopThread.getPid(), self.__mrSysDir, \                                    self.desc.getfinalAttrs()['fs.default.name'], \                                    self.path, self.log)  def run(self, dir):    status = True    args = []    desc = self.desc        self.log.debug(pprint.pformat(desc.dict))            self.log.debug("Got package dir of %s" % dir)        self.path = os.path.join(dir, self.program)        self.log.debug("path: %s" % self.path)    args.append(self.path)    args.extend(desc.getArgv())    envs = desc.getEnvs()    fenvs = os.environ        for k, v in envs.iteritems():      fenvs[k] = v        if envs.has_key('HADOOP_OPTS'):      fenvs['HADOOP_OPTS'] = envs['HADOOP_OPTS']      self.log.debug("HADOOP_OPTS : %s" % fenvs['HADOOP_OPTS'])        fenvs['JAVA_HOME'] = self.javahome    fenvs['HADOOP_CONF_DIR'] = self.confdir    fenvs['HADOOP_LOG_DIR'] = self.logdir    self.log.info(pprint.pformat(fenvs))    hadoopCommand = ''    for item in args:        hadoopCommand = "%s%s " % (hadoopCommand, item)    # Redirecting output and error to self.out and self.err    hadoopCommand = hadoopCommand + ' 1>%s 2>%s ' % (self.out, self.err)            self.log.debug('running command: %s' % (hadoopCommand))     self.log.debug('hadoop env: %s' % fenvs)    self.log.debug('Command stdout will be redirected to %s ' % self.out + \                   'and command stderr to %s' % self.err)    self.__hadoopThread = simpleCommand('hadoop', hadoopCommand, env=fenvs)    self.__hadoopThread.start()        while self.__hadoopThread.stdin == None:      time.sleep(.2)      self.log.debug("hadoopThread still == None ...")        input = desc.getStdin()    self.log.debug("hadoop input: %s" % input)    if input:      if self.__hadoopThread.is_running():        print >>self.__hadoopThread.stdin, input      else:        self.log.error("hadoop command failed to start")        self.__hadoopThread.stdin.close()          self.log.debug("isForground: %s" % desc.isForeground())    if desc.isForeground():      self.log.debug("Waiting on hadoop to finish...")      self.__hadoopThread.wait()            self.log.debug("Joining hadoop thread...")      self.__hadoopThread.join()      if self.__hadoopThread.exit_code() != 0:        status = False    else:      status = self.getCommandStatus()            self.log.debug("hadoop run status: %s" % status)            if status == False:      self.handleFailedCommand()       if (status == True) or (not desc.isIgnoreFailures()):      return status    else:      self.log.error("Ignoring Failure")      return True  def kill(self):    self.__hadoopThread.kill()    if self.__hadoopThread:      self.__hadoopThread.join()  def addCleanup(self, list):    list.extend(self.workdirs)    list.append(self.confdir)  def getCommandStatus(self):    status = True    ec = self.__hadoopThread.exit_code()    if (ec != 0) and (ec != None):      status = False    return status  def handleFailedCommand(self):    self.log.error('hadoop error: %s' % (                     self.__hadoopThread.exit_status_string()))    # read the contents of redirected stderr to print information back to user    if os.path.exists(self.err):      f = None      try:        f = open(self.err)        lines = f.readlines()        # format        for line in lines:          self.stdErrContents = "%s%s" % (self.stdErrContents, line)      finally:        if f is not None:          f.close()    self.log.error('See %s.out and/or %s.err for details. They are ' % \                   (self.name, self.name) + \                   'located at subdirectories under either ' + \                   'hodring.work-dirs or hodring.log-destination-uri.')class HodRing(hodBaseService):  """The main class for hodring that  polls the commands it runs"""  def __init__(self, config):    hodBaseService.__init__(self, 'hodring', config['hodring'])    self.log = self.logs['main']    self._http = None    self.__pkg = None    self.__pkgDir = None     self.__tempDir = None    self.__running = {}    self.__hadoopLogDirs = []    self.__init_temp_dir()  def __init_temp_dir(self):    self.__tempDir = os.path.join(self._cfg['temp-dir'],                                   "%s.%s.hodring" % (self._cfg['userid'],                                                       self._cfg['service-id']))    if not os.path.exists(self.__tempDir):      os.makedirs(self.__tempDir)    os.chdir(self.__tempDir)    def __fetch(self, url, spath):    retry = 3    success = False    while (retry != 0 and success != True):      try:        input = urllib.urlopen(url)        bufsz = 81920        buf = input.read(bufsz)        out = open(spath, 'w')        while len(buf) > 0:          out.write(buf)          buf = input.read(bufsz)        input.close()        out.close()        success = True      except:        self.log.debug("Failed to copy file")        retry = retry - 1    if (retry == 0 and success != True):      raise IOError, "Failed to copy the files"        def __get_name(self, addr):    parsedUrl = urlparse(addr)    path = parsedUrl[2]    split = path.split('/', 1)    return split[1]  def __get_dir(self, name):    """Return the root directory inside the tarball    specified by name. Assumes that the tarball begins    with a root directory."""    import tarfile    myTarFile = tarfile.open(name)    hadoopPackage = myTarFile.getnames()[0]    self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))    return hadoopPackage  def getRunningValues(self):    return self.__running.values()  def getTempDir(self):    return self.__tempDir  def getHadoopLogDirs(self):    return self.__hadoopLogDirs   def __download_package(self, ringClient):    self.log.debug("Found download address: %s" %                    self._cfg['download-addr'])    try:      addr = 'none'      downloadTime = self._cfg['tarball-retry-initial-time']           # download time depends on tarball size and network bandwidth            increment = 0            addr = ringClient.getTarList(self.hostname)      while(addr == 'none'):        rand = self._cfg['tarball-retry-initial-time'] + increment + \                        random.uniform(0,self._cfg['tarball-retry-interval'])        increment = increment + 1        self.log.debug("got no tarball. Retrying again in %s seconds." % rand)        time.sleep(rand)        addr = ringClient.getTarList(self.hostname)          self.log.debug("got this address %s" % addr)            tarName = self.__get_name(addr)      self.log.debug("tar package name: %s" % tarName)            fetchPath = os.path.join(os.getcwd(), tarName)       self.log.debug("fetch path: %s" % fetchPath)            self.__fetch(addr, fetchPath)      self.log.debug("done fetching")          tarUrl = "http://%s:%d/%s" % (self._http.server_address[0],                                     self._http.server_address[1],                                     tarName)      try:         ringClient.registerTarSource(self.hostname, tarUrl,addr)        #ringClient.tarDone(addr)      except KeyError, e:        self.log.error("registerTarSource and tarDone failed: ", e)        raise KeyError(e)            check = untar(fetchPath, os.getcwd())            if (check == False):        raise IOError, "Untarring failed."            self.__pkg = self.__get_dir(tarName)      self.__pkgDir = os.path.join(os.getcwd(), self.__pkg)          except Exception, e:      self.log.error("Failed download tar package: %s" %                      get_exception_error_string())      raise Exception(e)        def __run_hadoop_commands(self, restart=True):    id = 0    for desc in self._cfg['commanddesc']:      self.log.debug(pprint.pformat(desc.dict))      mrSysDir = getMapredSystemDirectory(self._cfg['mapred-system-dir-root'],                          self._cfg['userid'], self._cfg['service-id'])      self.log.debug('mrsysdir is %s' % mrSysDir)      cmd = HadoopCommand(id, desc, self.__tempDir, self.__pkgDir, self.log,                           self._cfg['java-home'], mrSysDir, restart)          self.__hadoopLogDirs.append(cmd.logdir)      self.log.debug("hadoop log directory: %s" % self.__hadoopLogDirs)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -