⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ringmaster.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
#        ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)#        self.log.debug("Invoking clusterStop on " + ringXRAddress)#        ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())#        ring['thread'] = ringThread#        ringThread.start()##      for ring in ringList:#        ringThread = ring['thread']#        if ringThread == None:#          raise Exception("Could not get hodring thread.")#        ringThread.join()#        self.log.debug("Completed clusterStop on " + ring['xrs'])##    except:#      self.log.debug(get_exception_string())#      return False##    self.log.debug("Successfully stopped cluster.")#    #    return True  def getCommand(self, addr):    """This method is called by the    hodrings to get commands from    the ringmaster"""    lock = self.cmdLock    cmdList = []    lock.acquire()    try:      try:        for v in self.serviceDict.itervalues():          if (not v.isExternal()):            if v.isLaunchable(self.serviceDict):              # If a master is still not launched, or the number of               # retries for launching master is not reached,               # launch master              if not v.isMasterLaunched() and \                  (v.getMasterFailureCount() <= \                      self.cfg['ringmaster']['max-master-failures']):                cmdList = v.getMasterCommands(self.serviceDict)                v.setlaunchedMaster()                v.setMasterAddress(addr)                break        if cmdList == []:          for s in self.serviceDict.itervalues():            if (not v.isExternal()):              if s.isMasterInitialized():                cl = s.getWorkerCommands(self.serviceDict)                cmdList.extend(cl)              else:                cmdList = []                break      except:        self.log.debug(get_exception_string())    finally:      lock.release()      pass        cmd = addr + pformat(cmdList)    self.log.debug("getCommand returning " + cmd)    return cmdList    def getAdminCommand(self, addr):    """This method is called by the    hodrings to get admin commands from    the ringmaster"""    lock = self.cmdLock    cmdList = []    lock.acquire()    try:      try:        for v in self.serviceDict.itervalues():          cmdList = v.getAdminCommands(self.serviceDict)          if cmdList != []:            break      except Exception, e:        self.log.debug(get_exception_string())    finally:      lock.release()      pass    cmd = addr + pformat(cmdList)    self.log.debug("getAdminCommand returning " + cmd)    return cmdList  def addMasterParams(self, addr, vals):    """This method is called by    hodring to update any parameters    its changed for the commands it was    running"""    self.log.debug('Comment: adding master params from %s' % addr)    self.log.debug(pformat(vals))    lock = self.masterParamLock    lock.acquire()    try:      for v in self.serviceDict.itervalues():        if v.isMasterLaunched():          if (v.getMasterAddress() == addr):            v.setMasterParams(vals)            v.setMasterInitialized()    except:      self.log.debug(get_exception_string())      pass    lock.release()                return addr  def setHodRingErrors(self, addr, errors):    """This method is called by the hodrings to update errors       it encountered while starting up"""    self.log.critical("Hodring at %s failed with following errors:\n%s" \                        % (addr, errors))    lock = self.masterParamLock    lock.acquire()    try:      for v in self.serviceDict.itervalues():        if v.isMasterLaunched():          if (v.getMasterAddress() == addr):            # strip the PID part.            idx = addr.rfind('_')            if idx is not -1:              addr = addr[:idx]            v.setMasterFailed("Hodring at %s failed with following" \                                " errors:\n%s" % (addr, errors))    except:      self.log.debug(get_exception_string())      pass    lock.release()    return True  def getKeys(self):    lock= self.masterParamLock    lock.acquire()    keys = self.serviceDict.keys()    lock.release()          return keys    def getServiceAddr(self, name):    addr = 'not found'    self.log.debug("getServiceAddr name: %s" % name)    lock= self.masterParamLock    lock.acquire()    try:      service = self.serviceDict[name]    except KeyError:      pass    else:      self.log.debug("getServiceAddr service: %s" % service)      # Check if we should give up ! If the limit on max failures is hit,       # give up.      err = service.getMasterFailed()      if (err is not None) and \            (service.getMasterFailureCount() > \                      self.cfg['ringmaster']['max-master-failures']):        self.log.critical("Detected errors (%s) beyond allowed number"\                            " of failures (%s). Flagging error to client" \                            % (service.getMasterFailureCount(), \                              self.cfg['ringmaster']['max-master-failures']))        addr = "Error: " + err      elif (service.isMasterInitialized()):        addr = service.getMasterAddrs()[0]      else:        addr = 'not found'    lock.release()    self.log.debug("getServiceAddr addr %s: %s" % (name, addr))        return addr  def getURLs(self, name):    addr = 'none'    lock = self.masterParamLock    lock.acquire()        try:      service = self.serviceDict[name]    except KeyError:      pass    else:      if (service.isMasterInitialized()):        addr = service.getInfoAddrs()[0]          lock.release()        return addr  def stopRM(self):    """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""    # We spawn a thread here because we want the XMLRPC call to return. Calling    # stop directly from here will also stop the XMLRPC server.    try:      self.log.debug("inside xml-rpc call to stop ringmaster")      rmStopperThread = func('RMStopper', self.rm.stop)      rmStopperThread.start()      self.log.debug("returning from xml-rpc call to stop ringmaster")      return True    except:      self.log.debug("Exception in stop: %s" % get_exception_string())      return Falseclass RingMaster:  def __init__(self, cfg, log, **kwds):    """starts nodepool and services"""    self.download = False    self.httpServer = None    self.cfg = cfg    self.log = log    self.__hostname = local_fqdn()    self.workDirs = None     # ref to the idle job tracker object.    self.__jtMonitor = None    self.__idlenessDetected = False    self.__stopInProgress = False    self.__isStopped = False # to let main exit    self.__exitCode = 0 # exit code with which the ringmaster main method should return    self.__initialize_signal_handlers()        sdd = self.cfg['servicedesc']    gsvc = None    for key in sdd:      gsvc = sdd[key]      break        npd = self.cfg['nodepooldesc']    self.np = NodePoolUtil.getNodePool(npd, cfg, log)    self.log.debug("Getting service ID.")        self.serviceId = self.np.getServiceId()        self.log.debug("Got service ID: %s" % self.serviceId)    self.tarSrcLoc = None    if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):      self.download = True      self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball']     self.cd_to_tempdir()    if (self.download):      self.__copy_tarball(os.getcwd())      self.basename = self.__find_tarball_in_dir(os.getcwd())      if self.basename is None:        raise Exception('Did not find tarball copied from %s in %s.'                          % (self.tarSrcLoc, os.getcwd()))          self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr'])        self.log.debug("Service registry @ %s" % self.serviceAddr)        self.serviceClient = hodXRClient(self.serviceAddr)    self.serviceDict  = {}    try:      sdl = self.cfg['servicedesc']      workDirs = self.getWorkDirs(cfg)      hdfsDesc = sdl['hdfs']      hdfs = None       # Determine hadoop Version      hadoopVers = hadoopVersion(self.__getHadoopDir(), \                                self.cfg['hodring']['java-home'], self.log)           if (hadoopVers['major']==None) or (hadoopVers['minor']==None):        raise Exception('Could not retrive the version of Hadoop.'                        + ' Check the Hadoop installation or the value of the hodring.java-home variable.')      if hdfsDesc.isExternal():        hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))        hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )      else:        hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']))      self.serviceDict[hdfs.getName()] = hdfs            mrDesc = sdl['mapred']      mr = None      if mrDesc.isExternal():        mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))        mr.setMasterParams( self.cfg['gridservice-mapred'] )      else:        mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']))      self.serviceDict[mr.getName()] = mr    except:      self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \                            %s." % get_exception_error_string())      self.log.debug(get_exception_string())      raise    # should not be starting these in a constructor    ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)        self.rpcserver = ringMasterServer.getAddress()        self.httpAddress = None       self.tarAddress = None     hostname = socket.gethostname()    if (self.download):      self.httpServer = threadedHTTPServer(hostname,         self.cfg['ringmaster']['http-port-range'])            self.httpServer.serve_forever()      self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0],                                  self.httpServer.server_address[1])      self.tarAddress = "%s%s" % (self.httpAddress, self.basename)            ringMasterServer.instance.logMasterSources.registerTarSource(hostname,                                                                    self.tarAddress)    else:      self.log.debug("Download not set.")        self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'],       self.serviceId, self.__hostname, 'ringmaster', 'hod'))        if self.cfg['ringmaster']['register']:            if self.httpAddress:        self.serviceClient.registerService(self.cfg['ringmaster']['userid'],           self.serviceId, self.__hostname, 'ringmaster', 'hod', {          'xrs' : self.rpcserver, 'http' : self.httpAddress })      else:        self.serviceClient.registerService(self.cfg['ringmaster']['userid'],           self.serviceId, self.__hostname, 'ringmaster', 'hod', {          'xrs' : self.rpcserver, })        self.log.debug("Registered with serivce registry: %s." % self.serviceAddr)        hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring')    hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_'                                   + getpass.getuser())        self.cfg['hodring']['hodring'] = [hodRingWorkDir,]    self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr']    self.cfg['hodring']['service-id'] = self.np.getServiceId()    self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver)        if (self.tarSrcLoc != None):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -