⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hadoop.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 2 页
字号:
             self.__log.debug(get_exception_string())             time.sleep(1)           if input:           out = None               self.__log.debug("collecting " + link + "...")           filename = re.sub(self.mapredInfo, "", link)           filename = dir + "/"  + filename           filename = re.sub("http://","", filename)           filename = re.sub("[\?\&=:]","_",filename)           filename = filename + ".html"               try:             tempdir, tail = os.path.split(filename)             if not os.path.exists(tempdir):               os.makedirs(tempdir)           except:             self.__log.debug(get_exception_string())               out = open(filename, 'w')                      bufSz = 8192                      signal.alarm(10)                      try:             self.__log.debug("Starting to grab: %s" % link)             buf = input.read(bufSz)                   while len(buf) > 0:               # Feed the file into the HTML parser               parser.feed(buf)                 # Re-write the hrefs in the file               p = re.compile("\?(.+?)=(.+?)")               buf = p.sub(r"_\1_\2",buf)               p= re.compile("&(.+?)=(.+?)")               buf = p.sub(r"_\1_\2",buf)               p = re.compile("http://(.+?):(\d+)?")               buf = p.sub(r"\1_\2/",buf)               buf = re.sub("href=\"/","href=\"",buf)               p = re.compile("href=\"(.+?)\"")               buf = p.sub(r"href=\1.html",buf)                out.write(buf)               buf = input.read(bufSz)                   signal.alarm(0)             input.close()             if out:               out.close()                            self.__log.debug("Finished grabbing: %s" % link)           except AlarmException:             if hodInterrupt.isSet():               raise HodInterruptException()             if out: out.close()             if input: input.close()                          self.__log.debug("Failed to retrieve: %s" % link)         else:           self.__log.debug("Failed to retrieve: %s" % link)                # Get the next link in level traversal order       link = parser.getNextLink()     parser.close()       def check_cluster(self, clusterInfo):    status = 0    if 'mapred' in clusterInfo:      mapredAddress = clusterInfo['mapred'][7:]      hdfsAddress = clusterInfo['hdfs'][7:]      status = get_cluster_status(hdfsAddress, mapredAddress)      if status == 0:        status = 12    else:      status = 15    return status  def is_cluster_deallocated(self, jobId):    """Returns True if the JobId that represents this cluster       is in the Completed or exiting state."""    jobInfo = self.__nodePool.getJobInfo(jobId)    state = None    if jobInfo is not None and jobInfo.has_key('job_state'):      state = jobInfo['job_state']    return ((state == 'C') or (state == 'E'))  def cleanup(self):    if self.__nodePool: self.__nodePool.finalize()       def get_job_id(self):    return self.jobId  def delete_job(self, jobId):    '''Delete a job given it's ID'''    ret = 0    if self.__nodePool:       ret = self.__nodePool.deleteJob(jobId)    else:      raise Exception("Invalid state: Node pool is not initialized to delete the given job.")    return ret           def allocate(self, clusterDir, min, max=None):    status = 0      self.__svcrgyClient = self.__get_svcrgy_client()            self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))        if min < 3:      self.__log.critical("Minimum nodes must be greater than 2.")      status = 2    else:      nodeSet = self.__nodePool.newNodeSet(min)      walltime = None      if self.__cfg['hod'].has_key('walltime'):        walltime = self.__cfg['hod']['walltime']      self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)      if self.jobId:        jobStatus = None        try:          jobStatus = self.__check_job_status()        except HodInterruptException, h:          self.__log.info(HOD_INTERRUPTED_MESG)          self.delete_job(self.jobId)          self.__log.info("Cluster %s removed from queue." % self.jobId)          raise h        else:          if jobStatus == -1:            self.delete_job(self.jobId);            status = 4            return status        if jobStatus:          self.__log.info("Cluster Id %s" \                                                              % self.jobId)          try:            self.ringmasterXRS = self.__get_ringmaster_client()                        self.__log.debug("Ringmaster at : %s" % self.ringmasterXRS )            ringClient = None            if self.ringmasterXRS:              ringClient =  hodXRClient(self.ringmasterXRS)                              hdfsStatus, hdfsAddr, self.hdfsInfo = \                self.__init_hadoop_service('hdfs', ringClient)                              if hdfsStatus:                self.__log.info("HDFS UI at http://%s" % self.hdfsInfo)                  mapredStatus, mapredAddr, self.mapredInfo = \                  self.__init_hadoop_service('mapred', ringClient)                  if mapredStatus:                  self.__log.info("Mapred UI at http://%s" % self.mapredInfo)                    if self.__cfg['hod'].has_key('update-worker-info') \                    and self.__cfg['hod']['update-worker-info']:                    workerInfoMap = {}                    workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo                    workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo                    if mapredAddr.find(':') != -1:                      workerInfoMap['Mapred RPC Port'] = mapredAddr.split(':')[1]                    ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)                    if ret != 0:                      self.__log.warn('Could not update HDFS and Mapred information.' \                                      'User Portal may not show relevant information.' \                                      'Error code=%s' % ret)                    self.__cfg.replace_escape_seqs()                                      # Go generate the client side hadoop-site.xml now                  # adding final-params as well, just so that conf on                   # client-side and server-side are (almost) the same                  clientParams = None                  serverParams = {}                  finalServerParams = {}                    # client-params                  if self.__cfg['hod'].has_key('client-params'):                    clientParams = self.__cfg['hod']['client-params']                    # server-params                  if self.__cfg['gridservice-mapred'].has_key('server-params'):                    serverParams.update(\                      self.__cfg['gridservice-mapred']['server-params'])                  if self.__cfg['gridservice-hdfs'].has_key('server-params'):                    # note that if there are params in both mapred and hdfs                    # sections, the ones in hdfs overwirte the ones in mapred                    serverParams.update(\                        self.__cfg['gridservice-hdfs']['server-params'])                                      # final-server-params                  if self.__cfg['gridservice-mapred'].has_key(\                                                    'final-server-params'):                    finalServerParams.update(\                      self.__cfg['gridservice-mapred']['final-server-params'])                  if self.__cfg['gridservice-hdfs'].has_key(                                                    'final-server-params'):                    finalServerParams.update(\                        self.__cfg['gridservice-hdfs']['final-server-params'])                    clusterFactor = self.__cfg['hod']['cluster-factor']                  tempDir = self.__cfg['hod']['temp-dir']                  if not os.path.exists(tempDir):                    os.makedirs(tempDir)                  tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\                                  + "." + self.jobId )                  mrSysDir = getMapredSystemDirectory(self.__cfg['hodring']['mapred-system-dir-root'],\                                      self.__cfg['hod']['userid'], self.jobId)                  self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\                            hdfsAddr, mrSysDir, mapredAddr, clientParams,\                            serverParams, finalServerParams,\                            clusterFactor)                  self.__log.info("hadoop-site.xml at %s" % clusterDir)                  # end of hadoop-site.xml generation                else:                  status = 8              else:                status = 7              else:              status = 6            if status != 0:              self.__log.debug("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)              if ringClient is None:                self.delete_job(self.jobId)              else:                self.__log.debug("Calling rm.stop()")                ringClient.stopRM()                self.__log.debug("Returning from rm.stop()")          except HodInterruptException, h:            self.__log.info(HOD_INTERRUPTED_MESG)            if self.ringmasterXRS:              if ringClient is None:                ringClient =  hodXRClient(self.ringmasterXRS)              self.__log.debug("Calling rm.stop()")              ringClient.stopRM()              self.__log.debug("Returning from rm.stop()")              self.__log.info("Cluster Shutdown by informing ringmaster.")            else:              self.delete_job(self.jobId)              self.__log.info("Cluster %s removed from queue directly." % self.jobId)            raise h        else:          self.__log.critical("No cluster found, ringmaster failed to run.")          status = 5       elif self.jobId == False:        if exitCode == 188:          self.__log.critical("Request execeeded maximum resource allocation.")        else:          self.__log.critical("Insufficient resources available.")        status = 4      else:            self.__log.critical("Scheduler failure, allocation failed.\n\n")                status = 4        if status == 5 or status == 6:      ringMasterErrors = self.__svcrgyClient.getRMError()      if ringMasterErrors:        self.__log.critical("Cluster could not be allocated because" \                            " of the following errors on the "\                            "ringmaster host %s.\n%s" % \                               (ringMasterErrors[0], ringMasterErrors[1]))        self.__log.debug("Stack trace on ringmaster: %s" % ringMasterErrors[2])    return status  def __isRingMasterAlive(self, rmAddr):    ret = True    rmSocket = tcpSocket(rmAddr)    try:      rmSocket.open()      rmSocket.close()    except tcpError:      ret = False    return ret  def deallocate(self, clusterDir, clusterInfo):    status = 0         nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'],                                          id=clusterInfo['jobid'])    self.mapredInfo = clusterInfo['mapred']    self.hdfsInfo = clusterInfo['hdfs']    try:      if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):        clusterStatus = self.check_cluster(clusterInfo)        if clusterStatus != 14 and clusterStatus != 10:             # If JT is still alive          self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])      else:        self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')    except HodInterruptException, h:      # got an interrupt. just pass and proceed to qdel      pass     except:      self.__log.info("Exception in collecting Job tracker logs. Ignoring.")        rmAddr = None    if clusterInfo.has_key('ring'):      # format is http://host:port/ We need host:port      rmAddr = clusterInfo['ring'][7:]      if rmAddr.endswith('/'):        rmAddr = rmAddr[:-1]    if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):      # Cluster is already dead, don't try to contact ringmaster.      self.__nodePool.finalize()      status = 10 # As cluster is dead, we just set the status to 'cluster dead'.    else:      xrsAddr = clusterInfo['ring']      rmClient = hodXRClient(xrsAddr)      self.__log.debug('calling rm.stop')      rmClient.stopRM()      self.__log.debug('completed rm.stop')    # cleanup hod temp dirs    tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \                    self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )    if os.path.exists(tempDir):      shutil.rmtree(tempDir)       return status  class hadoopScript:  def __init__(self, conf, execDir):    self.__environ = os.environ.copy()    self.__environ['HADOOP_CONF_DIR'] = conf    self.__execDir = execDir      def run(self, script):    scriptThread = simpleCommand(script, script, self.__environ, 4, False,                                  False, self.__execDir)    scriptThread.start()    scriptThread.wait()    scriptThread.join()        return scriptThread.exit_code()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -