📄 hodring.py
字号:
try: # if the tarball isn't there, we use the pkgs dir given. if self.__pkgDir == None: pkgdir = desc.getPkgDirs() else: pkgdir = self.__pkgDir self.log.debug('This is the packcage dir %s ' % (pkgdir)) if not cmd.run(pkgdir): addnInfo = "" if cmd.stdErrContents is not "": addnInfo = " Information from stderr of the command:\n%s" % (cmd.stdErrContents) raise Exception("Could not launch the %s using %s/bin/hadoop.%s" % (desc.getName(), pkgdir, addnInfo)) except Exception, e: self.log.debug("Exception running hadoop command: %s\n%s" % (get_exception_error_string(), get_exception_string())) self.__running[id] = cmd raise Exception(e) id += 1 if desc.isForeground(): continue self.__running[id-1] = cmd # ok.. now command is running. If this HodRing got jobtracker, # Check if it is ready for accepting jobs, and then only return self.__check_jobtracker(desc, id-1, pkgdir) def __check_jobtracker(self, desc, id, pkgdir): # Check jobtracker status. Return properly if it is ready to accept jobs. # Currently Checks for Jetty to come up, the last thing that can be checked # before JT completes initialisation. To be perfectly reliable, we need # hadoop support name = desc.getName() if name == 'jobtracker': # Yes I am the Jobtracker self.log.debug("Waiting for jobtracker to initialise") version = desc.getVersion() self.log.debug("jobtracker version : %s" % version) hadoopCmd = self.getRunningValues()[id] attrs = hadoopCmd.getFilledInKeyValues() attrs = parseEquals(attrs) jobTrackerAddr = attrs['mapred.job.tracker'] self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr) if version < 16: jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \ attrs['mapred.job.tracker.info.port'] else: jettyAddr = attrs['mapred.job.tracker.http.address'] self.log.debug("Jobtracker jetty : %s" % jettyAddr) # Check for Jetty to come up # For this do a http head, and then look at the status defaultTimeout = socket.getdefaulttimeout() # socket timeout isn`t exposed at httplib level. Setting explicitly. socket.setdefaulttimeout(1) sleepTime = 0.5 jettyStatus = False jettyStatusmsg = "" while sleepTime <= 32: # There is a possibility that the command might fail after a while. # This code will check if the command failed so that a better # error message can be returned to the user. if not hadoopCmd.getCommandStatus(): self.log.critical('Hadoop command found to have failed when ' \ 'checking for jobtracker status') hadoopCmd.handleFailedCommand() addnInfo = "" if hadoopCmd.stdErrContents is not "": addnInfo = " Information from stderr of the command:\n%s" \ % (hadoopCmd.stdErrContents) raise Exception("Could not launch the %s using %s/bin/hadoop.%s" \ % (desc.getName(), pkgdir, addnInfo)) try: jettyConn = httplib.HTTPConnection(jettyAddr) jettyConn.request("HEAD", "/jobtracker.jsp") # httplib inherently retries the following till socket timeout resp = jettyConn.getresponse() if resp.status != 200: # Some problem? jettyStatus = False jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\ " request. HTTP Status (Code, Msg): (%s, %s)" % \ ( resp.status, resp.reason ) break else: self.log.info("Jetty returned a 200 status (%s)" % resp.reason) self.log.info("JobTracker successfully initialised") return except socket.error: self.log.debug("Jetty gave a socket error. Sleeping for %s" \ % sleepTime) time.sleep(sleepTime) sleepTime = sleepTime * 2 except Exception, e: jettyStatus = False jettyStatusmsg = ("Process(possibly other than jetty) running on" + \ " port assigned to jetty is returning invalid http response") break socket.setdefaulttimeout(defaultTimeout) if not jettyStatus: self.log.critical("Jobtracker failed to initialise.") if jettyStatusmsg: self.log.critical( "Reason: %s" % jettyStatusmsg ) else: self.log.critical( "Reason: Jetty failed to give response") raise Exception("JobTracker failed to initialise") def stop(self): self.log.debug("Entered hodring stop.") if self._http: self.log.debug("stopping http server...") self._http.stop() self.log.debug("call hodsvcrgy stop...") hodBaseService.stop(self) def _xr_method_clusterStart(self, initialize=True): return self.clusterStart(initialize) def _xr_method_clusterStop(self): return self.clusterStop() def start(self): """Run and maintain hodring commands""" try: if self._cfg.has_key('download-addr'): self._http = threadedHTTPServer('', self._cfg['http-port-range']) self.log.info("Starting http server...") self._http.serve_forever() self.log.debug("http://%s:%d" % (self._http.server_address[0], self._http.server_address[1])) hodBaseService.start(self) ringXRAddress = None if self._cfg.has_key('ringmaster-xrs-addr'): ringXRAddress = "http://%s:%s/" % (self._cfg['ringmaster-xrs-addr'][0], self._cfg['ringmaster-xrs-addr'][1]) self.log.debug("Ringmaster at %s" % ringXRAddress) self.log.debug("Creating service registry XML-RPC client.") serviceClient = hodXRClient(to_http_url( self._cfg['svcrgy-addr'])) if ringXRAddress == None: self.log.info("Did not get ringmaster XML-RPC address. Fetching information from service registry.") ringList = serviceClient.getServiceInfo(self._cfg['userid'], self._cfg['service-id'], 'ringmaster', 'hod') self.log.debug(pprint.pformat(ringList)) if len(ringList): if isinstance(ringList, list): ringXRAddress = ringList[0]['xrs'] count = 0 while (ringXRAddress == None and count < 3000): ringList = serviceClient.getServiceInfo(self._cfg['userid'], self._cfg['service-id'], 'ringmaster', 'hod') if len(ringList): if isinstance(ringList, list): ringXRAddress = ringList[0]['xrs'] count = count + 1 time.sleep(.2) if ringXRAddress == None: raise Exception("Could not get ringmaster XML-RPC server address.") self.log.debug("Creating ringmaster XML-RPC client.") ringClient = hodXRClient(ringXRAddress) id = self.hostname + "_" + str(os.getpid()) if 'download-addr' in self._cfg: self.__download_package(ringClient) else: self.log.debug("Did not find a download address.") cmdlist = [] firstTime = True increment = 0 hadoopStartupTime = 2 cmdlist = ringClient.getCommand(id) while (cmdlist == []): if firstTime: sleepTime = increment + self._cfg['cmd-retry-initial-time'] + hadoopStartupTime\ + random.uniform(0,self._cfg['cmd-retry-interval']) firstTime = False else: sleepTime = increment + self._cfg['cmd-retry-initial-time'] + \ + random.uniform(0,self._cfg['cmd-retry-interval']) self.log.debug("Did not get command list. Waiting for %s seconds." % (sleepTime)) time.sleep(sleepTime) increment = increment + 1 cmdlist = ringClient.getCommand(id) self.log.debug(pformat(cmdlist)) cmdDescs = [] for cmds in cmdlist: cmdDescs.append(CommandDesc(cmds['dict'], self.log)) self._cfg['commanddesc'] = cmdDescs self.log.info("Running hadoop commands...") self.__run_hadoop_commands(False) masterParams = [] for k, cmd in self.__running.iteritems(): masterParams.extend(cmd.filledInKeyVals) self.log.debug("printing getparams") self.log.debug(pformat(id)) self.log.debug(pformat(masterParams)) # when this is on a required host, the ringMaster already has our masterParams if(len(masterParams) > 0): ringClient.addMasterParams(id, masterParams) except Exception, e: raise Exception(e) def clusterStart(self, initialize=True): """Start a stopped mapreduce/dfs cluster""" if initialize: self.log.debug('clusterStart Method Invoked - Initialize') else: self.log.debug('clusterStart Method Invoked - No Initialize') try: self.log.debug("Creating service registry XML-RPC client.") serviceClient = hodXRClient(to_http_url(self._cfg['svcrgy-addr']), None, None, 0, 0, 0) self.log.info("Fetching ringmaster information from service registry.") count = 0 ringXRAddress = None while (ringXRAddress == None and count < 3000): ringList = serviceClient.getServiceInfo(self._cfg['userid'], self._cfg['service-id'], 'ringmaster', 'hod') if len(ringList): if isinstance(ringList, list): ringXRAddress = ringList[0]['xrs'] count = count + 1 if ringXRAddress == None: raise Exception("Could not get ringmaster XML-RPC server address.") self.log.debug("Creating ringmaster XML-RPC client.") ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0) id = self.hostname + "_" + str(os.getpid()) cmdlist = [] if initialize: if 'download-addr' in self._cfg: self.__download_package(ringClient) else: self.log.debug("Did not find a download address.") while (cmdlist == []): cmdlist = ringClient.getCommand(id) else: while (cmdlist == []): cmdlist = ringClient.getAdminCommand(id) self.log.debug(pformat(cmdlist)) cmdDescs = [] for cmds in cmdlist: cmdDescs.append(CommandDesc(cmds['dict'], self.log)) self._cfg['commanddesc'] = cmdDescs if initialize: self.log.info("Running hadoop commands again... - Initialize") self.__run_hadoop_commands() masterParams = [] for k, cmd in self.__running.iteritems(): self.log.debug(cmd) masterParams.extend(cmd.filledInKeyVals) self.log.debug("printing getparams") self.log.debug(pformat(id)) self.log.debug(pformat(masterParams)) # when this is on a required host, the ringMaster already has our masterParams if(len(masterParams) > 0): ringClient.addMasterParams(id, masterParams) else: self.log.info("Running hadoop commands again... - No Initialize") self.__run_hadoop_commands() except: self.log.error(get_exception_string()) return True def clusterStop(self): """Stop a running mapreduce/dfs cluster without stopping the hodring""" self.log.debug('clusterStop Method Invoked') try: for cmd in self.__running.values(): cmd.kill() self.__running = {} except: self.log.error(get_exception_string()) return True
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -