📄 ringmaster.py
字号:
# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)# self.log.debug("Invoking clusterStop on " + ringXRAddress)# ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())# ring['thread'] = ringThread# ringThread.start()## for ring in ringList:# ringThread = ring['thread']# if ringThread == None:# raise Exception("Could not get hodring thread.")# ringThread.join()# self.log.debug("Completed clusterStop on " + ring['xrs'])## except:# self.log.debug(get_exception_string())# return False## self.log.debug("Successfully stopped cluster.")# # return True def getCommand(self, addr): """This method is called by the hodrings to get commands from the ringmaster""" lock = self.cmdLock cmdList = [] lock.acquire() try: try: for v in self.serviceDict.itervalues(): if (not v.isExternal()): if v.isLaunchable(self.serviceDict): # If a master is still not launched, or the number of # retries for launching master is not reached, # launch master if not v.isMasterLaunched() and \ (v.getMasterFailureCount() <= \ self.cfg['ringmaster']['max-master-failures']): cmdList = v.getMasterCommands(self.serviceDict) v.setlaunchedMaster() v.setMasterAddress(addr) break if cmdList == []: for s in self.serviceDict.itervalues(): if (not v.isExternal()): if s.isMasterInitialized(): cl = s.getWorkerCommands(self.serviceDict) cmdList.extend(cl) else: cmdList = [] break except: self.log.debug(get_exception_string()) finally: lock.release() pass cmd = addr + pformat(cmdList) self.log.debug("getCommand returning " + cmd) return cmdList def getAdminCommand(self, addr): """This method is called by the hodrings to get admin commands from the ringmaster""" lock = self.cmdLock cmdList = [] lock.acquire() try: try: for v in self.serviceDict.itervalues(): cmdList = v.getAdminCommands(self.serviceDict) if cmdList != []: break except Exception, e: self.log.debug(get_exception_string()) finally: lock.release() pass cmd = addr + pformat(cmdList) self.log.debug("getAdminCommand returning " + cmd) return cmdList def addMasterParams(self, addr, vals): """This method is called by hodring to update any parameters its changed for the commands it was running""" self.log.debug('Comment: adding master params from %s' % addr) self.log.debug(pformat(vals)) lock = self.masterParamLock lock.acquire() try: for v in self.serviceDict.itervalues(): if v.isMasterLaunched(): if (v.getMasterAddress() == addr): v.setMasterParams(vals) v.setMasterInitialized() except: self.log.debug(get_exception_string()) pass lock.release() return addr def setHodRingErrors(self, addr, errors): """This method is called by the hodrings to update errors it encountered while starting up""" self.log.critical("Hodring at %s failed with following errors:\n%s" \ % (addr, errors)) lock = self.masterParamLock lock.acquire() try: for v in self.serviceDict.itervalues(): if v.isMasterLaunched(): if (v.getMasterAddress() == addr): # strip the PID part. idx = addr.rfind('_') if idx is not -1: addr = addr[:idx] v.setMasterFailed("Hodring at %s failed with following" \ " errors:\n%s" % (addr, errors)) except: self.log.debug(get_exception_string()) pass lock.release() return True def getKeys(self): lock= self.masterParamLock lock.acquire() keys = self.serviceDict.keys() lock.release() return keys def getServiceAddr(self, name): addr = 'not found' self.log.debug("getServiceAddr name: %s" % name) lock= self.masterParamLock lock.acquire() try: service = self.serviceDict[name] except KeyError: pass else: self.log.debug("getServiceAddr service: %s" % service) # Check if we should give up ! If the limit on max failures is hit, # give up. err = service.getMasterFailed() if (err is not None) and \ (service.getMasterFailureCount() > \ self.cfg['ringmaster']['max-master-failures']): self.log.critical("Detected errors (%s) beyond allowed number"\ " of failures (%s). Flagging error to client" \ % (service.getMasterFailureCount(), \ self.cfg['ringmaster']['max-master-failures'])) addr = "Error: " + err elif (service.isMasterInitialized()): addr = service.getMasterAddrs()[0] else: addr = 'not found' lock.release() self.log.debug("getServiceAddr addr %s: %s" % (name, addr)) return addr def getURLs(self, name): addr = 'none' lock = self.masterParamLock lock.acquire() try: service = self.serviceDict[name] except KeyError: pass else: if (service.isMasterInitialized()): addr = service.getInfoAddrs()[0] lock.release() return addr def stopRM(self): """An XMLRPC call which will spawn a thread to stop the Ringmaster program.""" # We spawn a thread here because we want the XMLRPC call to return. Calling # stop directly from here will also stop the XMLRPC server. try: self.log.debug("inside xml-rpc call to stop ringmaster") rmStopperThread = func('RMStopper', self.rm.stop) rmStopperThread.start() self.log.debug("returning from xml-rpc call to stop ringmaster") return True except: self.log.debug("Exception in stop: %s" % get_exception_string()) return Falseclass RingMaster: def __init__(self, cfg, log, **kwds): """starts nodepool and services""" self.download = False self.httpServer = None self.cfg = cfg self.log = log self.__hostname = local_fqdn() self.workDirs = None # ref to the idle job tracker object. self.__jtMonitor = None self.__idlenessDetected = False self.__stopInProgress = False self.__isStopped = False # to let main exit self.__exitCode = 0 # exit code with which the ringmaster main method should return self.__initialize_signal_handlers() sdd = self.cfg['servicedesc'] gsvc = None for key in sdd: gsvc = sdd[key] break npd = self.cfg['nodepooldesc'] self.np = NodePoolUtil.getNodePool(npd, cfg, log) self.log.debug("Getting service ID.") self.serviceId = self.np.getServiceId() self.log.debug("Got service ID: %s" % self.serviceId) self.tarSrcLoc = None if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): self.download = True self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball'] self.cd_to_tempdir() if (self.download): self.__copy_tarball(os.getcwd()) self.basename = self.__find_tarball_in_dir(os.getcwd()) if self.basename is None: raise Exception('Did not find tarball copied from %s in %s.' % (self.tarSrcLoc, os.getcwd())) self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr']) self.log.debug("Service registry @ %s" % self.serviceAddr) self.serviceClient = hodXRClient(self.serviceAddr) self.serviceDict = {} try: sdl = self.cfg['servicedesc'] workDirs = self.getWorkDirs(cfg) hdfsDesc = sdl['hdfs'] hdfs = None # Determine hadoop Version hadoopVers = hadoopVersion(self.__getHadoopDir(), \ self.cfg['hodring']['java-home'], self.log) if (hadoopVers['major']==None) or (hadoopVers['minor']==None): raise Exception('Could not retrive the version of Hadoop.' + ' Check the Hadoop installation or the value of the hodring.java-home variable.') if hdfsDesc.isExternal(): hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor'])) hdfs.setMasterParams( self.cfg['gridservice-hdfs'] ) else: hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor'])) self.serviceDict[hdfs.getName()] = hdfs mrDesc = sdl['mapred'] mr = None if mrDesc.isExternal(): mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor'])) mr.setMasterParams( self.cfg['gridservice-mapred'] ) else: mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor'])) self.serviceDict[mr.getName()] = mr except: self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \ %s." % get_exception_error_string()) self.log.debug(get_exception_string()) raise # should not be starting these in a constructor ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self) self.rpcserver = ringMasterServer.getAddress() self.httpAddress = None self.tarAddress = None hostname = socket.gethostname() if (self.download): self.httpServer = threadedHTTPServer(hostname, self.cfg['ringmaster']['http-port-range']) self.httpServer.serve_forever() self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], self.httpServer.server_address[1]) self.tarAddress = "%s%s" % (self.httpAddress, self.basename) ringMasterServer.instance.logMasterSources.registerTarSource(hostname, self.tarAddress) else: self.log.debug("Download not set.") self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], self.serviceId, self.__hostname, 'ringmaster', 'hod')) if self.cfg['ringmaster']['register']: if self.httpAddress: self.serviceClient.registerService(self.cfg['ringmaster']['userid'], self.serviceId, self.__hostname, 'ringmaster', 'hod', { 'xrs' : self.rpcserver, 'http' : self.httpAddress }) else: self.serviceClient.registerService(self.cfg['ringmaster']['userid'], self.serviceId, self.__hostname, 'ringmaster', 'hod', { 'xrs' : self.rpcserver, }) self.log.debug("Registered with serivce registry: %s." % self.serviceAddr) hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring') hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' + getpass.getuser()) self.cfg['hodring']['hodring'] = [hodRingWorkDir,] self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr'] self.cfg['hodring']['service-id'] = self.np.getServiceId() self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver) if (self.tarSrcLoc != None):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -