⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hodring.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
            try:        # if the tarball isn't there, we use the pkgs dir given.        if self.__pkgDir == None:          pkgdir = desc.getPkgDirs()        else:          pkgdir = self.__pkgDir        self.log.debug('This is the packcage dir %s ' % (pkgdir))        if not cmd.run(pkgdir):          addnInfo = ""          if cmd.stdErrContents is not "":            addnInfo = " Information from stderr of the command:\n%s" % (cmd.stdErrContents)          raise Exception("Could not launch the %s using %s/bin/hadoop.%s" % (desc.getName(), pkgdir, addnInfo))      except Exception, e:        self.log.debug("Exception running hadoop command: %s\n%s" % (get_exception_error_string(), get_exception_string()))        self.__running[id] = cmd        raise Exception(e)      id += 1      if desc.isForeground():        continue      self.__running[id-1] = cmd      # ok.. now command is running. If this HodRing got jobtracker,       # Check if it is ready for accepting jobs, and then only return      self.__check_jobtracker(desc, id-1, pkgdir)        def __check_jobtracker(self, desc, id, pkgdir):    # Check jobtracker status. Return properly if it is ready to accept jobs.    # Currently Checks for Jetty to come up, the last thing that can be checked    # before JT completes initialisation. To be perfectly reliable, we need     # hadoop support    name = desc.getName()    if name == 'jobtracker':      # Yes I am the Jobtracker      self.log.debug("Waiting for jobtracker to initialise")      version = desc.getVersion()      self.log.debug("jobtracker version : %s" % version)      hadoopCmd = self.getRunningValues()[id]      attrs = hadoopCmd.getFilledInKeyValues()      attrs = parseEquals(attrs)      jobTrackerAddr = attrs['mapred.job.tracker']      self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr)      if version < 16:        jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \                              attrs['mapred.job.tracker.info.port']      else:        jettyAddr = attrs['mapred.job.tracker.http.address']      self.log.debug("Jobtracker jetty : %s" % jettyAddr)      # Check for Jetty to come up      # For this do a http head, and then look at the status      defaultTimeout = socket.getdefaulttimeout()      # socket timeout isn`t exposed at httplib level. Setting explicitly.      socket.setdefaulttimeout(1)      sleepTime = 0.5      jettyStatus = False      jettyStatusmsg = ""      while sleepTime <= 32:        # There is a possibility that the command might fail after a while.        # This code will check if the command failed so that a better        # error message can be returned to the user.        if not hadoopCmd.getCommandStatus():          self.log.critical('Hadoop command found to have failed when ' \                            'checking for jobtracker status')          hadoopCmd.handleFailedCommand()          addnInfo = ""          if hadoopCmd.stdErrContents is not "":            addnInfo = " Information from stderr of the command:\n%s" \                                        % (hadoopCmd.stdErrContents)          raise Exception("Could not launch the %s using %s/bin/hadoop.%s" \                                        % (desc.getName(), pkgdir, addnInfo))                  try:          jettyConn = httplib.HTTPConnection(jettyAddr)          jettyConn.request("HEAD", "/jobtracker.jsp")          # httplib inherently retries the following till socket timeout          resp = jettyConn.getresponse()          if resp.status != 200:            # Some problem?            jettyStatus = False            jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\                             " request. HTTP Status (Code, Msg): (%s, %s)" % \                             ( resp.status, resp.reason )            break          else:            self.log.info("Jetty returned a 200 status (%s)" % resp.reason)            self.log.info("JobTracker successfully initialised")            return        except socket.error:          self.log.debug("Jetty gave a socket error. Sleeping for %s" \                                                                  % sleepTime)          time.sleep(sleepTime)          sleepTime = sleepTime * 2        except Exception, e:          jettyStatus = False          jettyStatusmsg = ("Process(possibly other than jetty) running on" + \                  " port assigned to jetty is returning invalid http response")          break      socket.setdefaulttimeout(defaultTimeout)      if not jettyStatus:        self.log.critical("Jobtracker failed to initialise.")        if jettyStatusmsg:          self.log.critical( "Reason: %s" % jettyStatusmsg )        else: self.log.critical( "Reason: Jetty failed to give response")        raise Exception("JobTracker failed to initialise")  def stop(self):    self.log.debug("Entered hodring stop.")    if self._http:       self.log.debug("stopping http server...")      self._http.stop()        self.log.debug("call hodsvcrgy stop...")    hodBaseService.stop(self)      def _xr_method_clusterStart(self, initialize=True):    return self.clusterStart(initialize)  def _xr_method_clusterStop(self):    return self.clusterStop()   def start(self):    """Run and maintain hodring commands"""        try:      if self._cfg.has_key('download-addr'):        self._http = threadedHTTPServer('', self._cfg['http-port-range'])        self.log.info("Starting http server...")        self._http.serve_forever()        self.log.debug("http://%s:%d" % (self._http.server_address[0],                     self._http.server_address[1]))            hodBaseService.start(self)            ringXRAddress = None      if self._cfg.has_key('ringmaster-xrs-addr'):        ringXRAddress = "http://%s:%s/" % (self._cfg['ringmaster-xrs-addr'][0],                          self._cfg['ringmaster-xrs-addr'][1])        self.log.debug("Ringmaster at %s" % ringXRAddress)      self.log.debug("Creating service registry XML-RPC client.")      serviceClient = hodXRClient(to_http_url(                                  self._cfg['svcrgy-addr']))      if ringXRAddress == None:        self.log.info("Did not get ringmaster XML-RPC address. Fetching information from service registry.")        ringList = serviceClient.getServiceInfo(self._cfg['userid'],             self._cfg['service-id'], 'ringmaster', 'hod')              self.log.debug(pprint.pformat(ringList))              if len(ringList):          if isinstance(ringList, list):            ringXRAddress = ringList[0]['xrs']              count = 0        while (ringXRAddress == None and count < 3000):          ringList = serviceClient.getServiceInfo(self._cfg['userid'],             self._cfg['service-id'], 'ringmaster', 'hod')                  if len(ringList):            if isinstance(ringList, list):              ringXRAddress = ringList[0]['xrs']                  count = count + 1          time.sleep(.2)            if ringXRAddress == None:        raise Exception("Could not get ringmaster XML-RPC server address.")              self.log.debug("Creating ringmaster XML-RPC client.")      ringClient = hodXRClient(ringXRAddress)                id = self.hostname + "_" + str(os.getpid())            if 'download-addr' in self._cfg:        self.__download_package(ringClient)      else:        self.log.debug("Did not find a download address.")                cmdlist = []      firstTime = True      increment = 0      hadoopStartupTime = 2             cmdlist = ringClient.getCommand(id)      while (cmdlist == []):        if firstTime:          sleepTime = increment + self._cfg['cmd-retry-initial-time'] + hadoopStartupTime\                        + random.uniform(0,self._cfg['cmd-retry-interval'])          firstTime = False        else:          sleepTime = increment + self._cfg['cmd-retry-initial-time'] + \                        + random.uniform(0,self._cfg['cmd-retry-interval'])        self.log.debug("Did not get command list. Waiting for %s seconds." % (sleepTime))        time.sleep(sleepTime)        increment = increment + 1        cmdlist = ringClient.getCommand(id)      self.log.debug(pformat(cmdlist))       cmdDescs = []      for cmds in cmdlist:        cmdDescs.append(CommandDesc(cmds['dict'], self.log))        self._cfg['commanddesc'] = cmdDescs            self.log.info("Running hadoop commands...")      self.__run_hadoop_commands(False)              masterParams = []      for k, cmd in self.__running.iteritems():        masterParams.extend(cmd.filledInKeyVals)        self.log.debug("printing getparams")      self.log.debug(pformat(id))      self.log.debug(pformat(masterParams))      # when this is on a required host, the ringMaster already has our masterParams      if(len(masterParams) > 0):        ringClient.addMasterParams(id, masterParams)    except Exception, e:      raise Exception(e)  def clusterStart(self, initialize=True):    """Start a stopped mapreduce/dfs cluster"""    if initialize:      self.log.debug('clusterStart Method Invoked - Initialize')    else:      self.log.debug('clusterStart Method Invoked - No Initialize')    try:      self.log.debug("Creating service registry XML-RPC client.")      serviceClient = hodXRClient(to_http_url(self._cfg['svcrgy-addr']),                                  None, None, 0, 0, 0)      self.log.info("Fetching ringmaster information from service registry.")      count = 0      ringXRAddress = None      while (ringXRAddress == None and count < 3000):        ringList = serviceClient.getServiceInfo(self._cfg['userid'],          self._cfg['service-id'], 'ringmaster', 'hod')        if len(ringList):          if isinstance(ringList, list):            ringXRAddress = ringList[0]['xrs']        count = count + 1      if ringXRAddress == None:        raise Exception("Could not get ringmaster XML-RPC server address.")      self.log.debug("Creating ringmaster XML-RPC client.")      ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0)      id = self.hostname + "_" + str(os.getpid())      cmdlist = []      if initialize:        if 'download-addr' in self._cfg:          self.__download_package(ringClient)        else:          self.log.debug("Did not find a download address.")        while (cmdlist == []):          cmdlist = ringClient.getCommand(id)      else:        while (cmdlist == []):          cmdlist = ringClient.getAdminCommand(id)      self.log.debug(pformat(cmdlist))      cmdDescs = []      for cmds in cmdlist:        cmdDescs.append(CommandDesc(cmds['dict'], self.log))      self._cfg['commanddesc'] = cmdDescs      if initialize:        self.log.info("Running hadoop commands again... - Initialize")        self.__run_hadoop_commands()        masterParams = []        for k, cmd in self.__running.iteritems():          self.log.debug(cmd)          masterParams.extend(cmd.filledInKeyVals)        self.log.debug("printing getparams")        self.log.debug(pformat(id))        self.log.debug(pformat(masterParams))        # when this is on a required host, the ringMaster already has our masterParams        if(len(masterParams) > 0):          ringClient.addMasterParams(id, masterParams)      else:        self.log.info("Running hadoop commands again... - No Initialize")        self.__run_hadoop_commands()    except:      self.log.error(get_exception_string())    return True  def clusterStop(self):    """Stop a running mapreduce/dfs cluster without stopping the hodring"""    self.log.debug('clusterStop Method Invoked')    try:      for cmd in self.__running.values():        cmd.kill()      self.__running = {}    except:      self.log.error(get_exception_string())    return True

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -