📄 hadoop.py
字号:
self.__log.debug(get_exception_string()) time.sleep(1) if input: out = None self.__log.debug("collecting " + link + "...") filename = re.sub(self.mapredInfo, "", link) filename = dir + "/" + filename filename = re.sub("http://","", filename) filename = re.sub("[\?\&=:]","_",filename) filename = filename + ".html" try: tempdir, tail = os.path.split(filename) if not os.path.exists(tempdir): os.makedirs(tempdir) except: self.__log.debug(get_exception_string()) out = open(filename, 'w') bufSz = 8192 signal.alarm(10) try: self.__log.debug("Starting to grab: %s" % link) buf = input.read(bufSz) while len(buf) > 0: # Feed the file into the HTML parser parser.feed(buf) # Re-write the hrefs in the file p = re.compile("\?(.+?)=(.+?)") buf = p.sub(r"_\1_\2",buf) p= re.compile("&(.+?)=(.+?)") buf = p.sub(r"_\1_\2",buf) p = re.compile("http://(.+?):(\d+)?") buf = p.sub(r"\1_\2/",buf) buf = re.sub("href=\"/","href=\"",buf) p = re.compile("href=\"(.+?)\"") buf = p.sub(r"href=\1.html",buf) out.write(buf) buf = input.read(bufSz) signal.alarm(0) input.close() if out: out.close() self.__log.debug("Finished grabbing: %s" % link) except AlarmException: if hodInterrupt.isSet(): raise HodInterruptException() if out: out.close() if input: input.close() self.__log.debug("Failed to retrieve: %s" % link) else: self.__log.debug("Failed to retrieve: %s" % link) # Get the next link in level traversal order link = parser.getNextLink() parser.close() def check_cluster(self, clusterInfo): status = 0 if 'mapred' in clusterInfo: mapredAddress = clusterInfo['mapred'][7:] hdfsAddress = clusterInfo['hdfs'][7:] status = get_cluster_status(hdfsAddress, mapredAddress) if status == 0: status = 12 else: status = 15 return status def is_cluster_deallocated(self, jobId): """Returns True if the JobId that represents this cluster is in the Completed or exiting state.""" jobInfo = self.__nodePool.getJobInfo(jobId) state = None if jobInfo is not None and jobInfo.has_key('job_state'): state = jobInfo['job_state'] return ((state == 'C') or (state == 'E')) def cleanup(self): if self.__nodePool: self.__nodePool.finalize() def get_job_id(self): return self.jobId def delete_job(self, jobId): '''Delete a job given it's ID''' ret = 0 if self.__nodePool: ret = self.__nodePool.deleteJob(jobId) else: raise Exception("Invalid state: Node pool is not initialized to delete the given job.") return ret def allocate(self, clusterDir, min, max=None): status = 0 self.__svcrgyClient = self.__get_svcrgy_client() self.__log.debug("allocate %s %s %s" % (clusterDir, min, max)) if min < 3: self.__log.critical("Minimum nodes must be greater than 2.") status = 2 else: nodeSet = self.__nodePool.newNodeSet(min) walltime = None if self.__cfg['hod'].has_key('walltime'): walltime = self.__cfg['hod']['walltime'] self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime) if self.jobId: jobStatus = None try: jobStatus = self.__check_job_status() except HodInterruptException, h: self.__log.info(HOD_INTERRUPTED_MESG) self.delete_job(self.jobId) self.__log.info("Cluster %s removed from queue." % self.jobId) raise h else: if jobStatus == -1: self.delete_job(self.jobId); status = 4 return status if jobStatus: self.__log.info("Cluster Id %s" \ % self.jobId) try: self.ringmasterXRS = self.__get_ringmaster_client() self.__log.debug("Ringmaster at : %s" % self.ringmasterXRS ) ringClient = None if self.ringmasterXRS: ringClient = hodXRClient(self.ringmasterXRS) hdfsStatus, hdfsAddr, self.hdfsInfo = \ self.__init_hadoop_service('hdfs', ringClient) if hdfsStatus: self.__log.info("HDFS UI at http://%s" % self.hdfsInfo) mapredStatus, mapredAddr, self.mapredInfo = \ self.__init_hadoop_service('mapred', ringClient) if mapredStatus: self.__log.info("Mapred UI at http://%s" % self.mapredInfo) if self.__cfg['hod'].has_key('update-worker-info') \ and self.__cfg['hod']['update-worker-info']: workerInfoMap = {} workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo if mapredAddr.find(':') != -1: workerInfoMap['Mapred RPC Port'] = mapredAddr.split(':')[1] ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId) if ret != 0: self.__log.warn('Could not update HDFS and Mapred information.' \ 'User Portal may not show relevant information.' \ 'Error code=%s' % ret) self.__cfg.replace_escape_seqs() # Go generate the client side hadoop-site.xml now # adding final-params as well, just so that conf on # client-side and server-side are (almost) the same clientParams = None serverParams = {} finalServerParams = {} # client-params if self.__cfg['hod'].has_key('client-params'): clientParams = self.__cfg['hod']['client-params'] # server-params if self.__cfg['gridservice-mapred'].has_key('server-params'): serverParams.update(\ self.__cfg['gridservice-mapred']['server-params']) if self.__cfg['gridservice-hdfs'].has_key('server-params'): # note that if there are params in both mapred and hdfs # sections, the ones in hdfs overwirte the ones in mapred serverParams.update(\ self.__cfg['gridservice-hdfs']['server-params']) # final-server-params if self.__cfg['gridservice-mapred'].has_key(\ 'final-server-params'): finalServerParams.update(\ self.__cfg['gridservice-mapred']['final-server-params']) if self.__cfg['gridservice-hdfs'].has_key( 'final-server-params'): finalServerParams.update(\ self.__cfg['gridservice-hdfs']['final-server-params']) clusterFactor = self.__cfg['hod']['cluster-factor'] tempDir = self.__cfg['hod']['temp-dir'] if not os.path.exists(tempDir): os.makedirs(tempDir) tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\ + "." + self.jobId ) mrSysDir = getMapredSystemDirectory(self.__cfg['hodring']['mapred-system-dir-root'],\ self.__cfg['hod']['userid'], self.jobId) self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\ hdfsAddr, mrSysDir, mapredAddr, clientParams,\ serverParams, finalServerParams,\ clusterFactor) self.__log.info("hadoop-site.xml at %s" % clusterDir) # end of hadoop-site.xml generation else: status = 8 else: status = 7 else: status = 6 if status != 0: self.__log.debug("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId) if ringClient is None: self.delete_job(self.jobId) else: self.__log.debug("Calling rm.stop()") ringClient.stopRM() self.__log.debug("Returning from rm.stop()") except HodInterruptException, h: self.__log.info(HOD_INTERRUPTED_MESG) if self.ringmasterXRS: if ringClient is None: ringClient = hodXRClient(self.ringmasterXRS) self.__log.debug("Calling rm.stop()") ringClient.stopRM() self.__log.debug("Returning from rm.stop()") self.__log.info("Cluster Shutdown by informing ringmaster.") else: self.delete_job(self.jobId) self.__log.info("Cluster %s removed from queue directly." % self.jobId) raise h else: self.__log.critical("No cluster found, ringmaster failed to run.") status = 5 elif self.jobId == False: if exitCode == 188: self.__log.critical("Request execeeded maximum resource allocation.") else: self.__log.critical("Insufficient resources available.") status = 4 else: self.__log.critical("Scheduler failure, allocation failed.\n\n") status = 4 if status == 5 or status == 6: ringMasterErrors = self.__svcrgyClient.getRMError() if ringMasterErrors: self.__log.critical("Cluster could not be allocated because" \ " of the following errors on the "\ "ringmaster host %s.\n%s" % \ (ringMasterErrors[0], ringMasterErrors[1])) self.__log.debug("Stack trace on ringmaster: %s" % ringMasterErrors[2]) return status def __isRingMasterAlive(self, rmAddr): ret = True rmSocket = tcpSocket(rmAddr) try: rmSocket.open() rmSocket.close() except tcpError: ret = False return ret def deallocate(self, clusterDir, clusterInfo): status = 0 nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'], id=clusterInfo['jobid']) self.mapredInfo = clusterInfo['mapred'] self.hdfsInfo = clusterInfo['hdfs'] try: if self.__cfg['hod'].has_key('hadoop-ui-log-dir'): clusterStatus = self.check_cluster(clusterInfo) if clusterStatus != 14 and clusterStatus != 10: # If JT is still alive self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir']) else: self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.') except HodInterruptException, h: # got an interrupt. just pass and proceed to qdel pass except: self.__log.info("Exception in collecting Job tracker logs. Ignoring.") rmAddr = None if clusterInfo.has_key('ring'): # format is http://host:port/ We need host:port rmAddr = clusterInfo['ring'][7:] if rmAddr.endswith('/'): rmAddr = rmAddr[:-1] if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)): # Cluster is already dead, don't try to contact ringmaster. self.__nodePool.finalize() status = 10 # As cluster is dead, we just set the status to 'cluster dead'. else: xrsAddr = clusterInfo['ring'] rmClient = hodXRClient(xrsAddr) self.__log.debug('calling rm.stop') rmClient.stopRM() self.__log.debug('completed rm.stop') # cleanup hod temp dirs tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \ self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] ) if os.path.exists(tempDir): shutil.rmtree(tempDir) return status class hadoopScript: def __init__(self, conf, execDir): self.__environ = os.environ.copy() self.__environ['HADOOP_CONF_DIR'] = conf self.__execDir = execDir def run(self, script): scriptThread = simpleCommand(script, script, self.__environ, 4, False, False, self.__execDir) scriptThread.start() scriptThread.wait() scriptThread.join() return scriptThread.exit_code()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -