⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hadoop.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 2 页
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License."""define WorkLoad as abstract interface for user job"""# -*- python -*-import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, mathfrom HTMLParser import HTMLParserimport xml.dom.minidomimport xml.dom.pulldomfrom xml.dom import getDOMImplementationfrom hodlib.Common.util import *from hodlib.Common.xmlrpc import hodXRClientfrom hodlib.Common.miniHTMLParser import miniHTMLParserfrom hodlib.Common.nodepoolutil import NodePoolUtilfrom hodlib.Common.tcp import tcpError, tcpSocketreCommandDelimeterString = r"(?<!\\);"reCommandDelimeter = re.compile(reCommandDelimeterString)class hadoopConfig:  def __create_xml_element(self, doc, name, value, description, final = False):    prop = doc.createElement("property")    nameP = doc.createElement("name")    string = doc.createTextNode(name)    nameP.appendChild(string)    valueP = doc.createElement("value")    string = doc.createTextNode(value)    valueP.appendChild(string)    if final:      finalP = doc.createElement("final")      string = doc.createTextNode("true")      finalP.appendChild(string)    desc = doc.createElement("description")    string = doc.createTextNode(description)    desc.appendChild(string)    prop.appendChild(nameP)    prop.appendChild(valueP)    if final:      prop.appendChild(finalP)    prop.appendChild(desc)        return prop  def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr, mrSysDir,\             mapredAddr=None, clientParams=None, serverParams=None,\             finalServerParams=None, clusterFactor=None):    if not mapredAddr:      mapredAddr = "dummy:8181"        implementation = getDOMImplementation()    doc = implementation.createDocument('', 'configuration', None)    comment = doc.createComment(      "This is an auto generated hadoop-site.xml, do not modify")    topElement = doc.documentElement    topElement.appendChild(comment)    description = {}    paramsDict = {  'mapred.job.tracker'    : mapredAddr , \                    'fs.default.name'       : "hdfs://" + hdfsAddr, \                    'hadoop.tmp.dir'        : tempDir, \                    'dfs.client.buffer.dir' : os.path.join(tempDir, 'dfs',                                                                    'tmp'),                 }    paramsDict['mapred.system.dir'] = mrSysDir        # mapred-default.xml is no longer used now.    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))    paramsDict['mapred.reduce.tasks'] = str(numred)    # end    # for all the above vars generated, set the description    for k, v in paramsDict.iteritems():      description[k] = 'Hod generated parameter'    # finalservelParams    if finalServerParams:      for k, v in finalServerParams.iteritems():        if not description.has_key(k):          description[k] = "final server parameter"          paramsDict[k] = v    # servelParams    if serverParams:      for k, v in serverParams.iteritems():        if not description.has_key(k):          # if no final value for same param is mentioned          description[k] = "server parameter"          paramsDict[k] = v    # clientParams    if clientParams:      for k, v in clientParams.iteritems():        if not description.has_key(k) or description[k] == "server parameter":          # Just add, if no final value for same param is mentioned.          # Replace even if server param is mentioned for same config variable          description[k] = "client-side parameter"          paramsDict[k] = v        # generate the xml elements    for k,v in paramsDict.iteritems():      if ( description[k] == "final server parameter" or \                             description[k] == "Hod generated parameter" ):          final = True      else: final = False      prop = self.__create_xml_element(doc, k, v, description[k], final)      topElement.appendChild(prop)    siteName = os.path.join(confDir, "hadoop-site.xml")    sitefile = file(siteName, 'w')    print >> sitefile, topElement.toxml()    sitefile.close()class hadoopCluster:  def __init__(self, cfg, log):    self.__cfg = cfg    self.__log = log    self.__changedClusterParams = []        self.__hostname = local_fqdn()        self.__svcrgyClient = None    self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'],                                                self.__cfg, self.__log)            self.__hadoopCfg = hadoopConfig()    self.jobId = None    self.mapredInfo = None    self.hdfsInfo = None    self.ringmasterXRS = None  def __get_svcrgy_client(self):    svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])    return hodXRClient(svcrgyUrl)  def __get_service_status(self):    serviceData = self.__get_service_data()        status = True    hdfs = False    mapred = False        for host in serviceData.keys():      for item in serviceData[host]:        service = item.keys()        if service[0] == 'hdfs.grid' and \          self.__cfg['gridservice-hdfs']['external'] == False:          hdfs = True        elif service[0] == 'mapred.grid':          mapred = True        if not mapred:      status = "mapred"        if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:      if status != True:        status = "mapred and hdfs"      else:        status = "hdfs"          return status    def __get_service_data(self):    registry = to_http_url(self.__cfg['hod']['xrs-address'])    serviceData = self.__svcrgyClient.getServiceInfo(      self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())        return serviceData    def __check_job_status(self):    initWaitCount = 20    count = 0    status = False    state = 'Q'    userLimitsFirstFlag = True    while state == 'Q':      if hodInterrupt.isSet():        raise HodInterruptException()      jobInfo = self.__nodePool.getJobInfo()      state = jobInfo['job_state']      if (state==False) or (state!='Q'):        break      count = count + 1      if count < initWaitCount:        time.sleep(0.5)      else:        time.sleep(10)      if self.__cfg['hod'].has_key('job-feasibility-attr') and \                      self.__cfg['hod']['job-feasibility-attr']:        (status, msg) = self.__isJobFeasible()        if status == "Never":          self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \                "This cluster cannot be allocated now.")          return -1        elif status == False:          if userLimitsFirstFlag:            self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \                "This cluster allocation will succeed only after other " + \                "clusters are deallocated.")            userLimitsFirstFlag = False       if state and state != 'C':      status = True        return status  def __isJobFeasible(self):    return self.__nodePool.isJobFeasible()    def __get_ringmaster_client(self):    ringmasterXRS = None       ringList = self.__svcrgyClient.getServiceInfo(      self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),       'ringmaster', 'hod')    if ringList and len(ringList):      if isinstance(ringList, list):        ringmasterXRS = ringList[0]['xrs']    else:          count = 0      waitTime = self.__cfg['hod']['allocate-wait-time']        while count < waitTime:        if hodInterrupt.isSet():          raise HodInterruptException()        ringList = self.__svcrgyClient.getServiceInfo(          self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),           'ringmaster',           'hod')                if ringList and len(ringList):          if isinstance(ringList, list):                    ringmasterXRS = ringList[0]['xrs']                if ringmasterXRS is not None:          break        else:          time.sleep(1)          count = count + 1          # check to see if the job exited by any chance in that time:          if (count % 10 == 0):            if not self.__check_job_status():              break    return ringmasterXRS   def __init_hadoop_service(self, serviceName, xmlrpcClient):    status = True    serviceAddress = None    serviceInfo = None     for i in range(0, 250):       try:        if hodInterrupt.isSet():            raise HodInterruptException()        serviceAddress = xmlrpcClient.getServiceAddr(serviceName)        if serviceAddress:          if serviceAddress == 'not found':            time.sleep(.5)          # check to see if the job exited by any chance in that time:            if (i % 10 == 0):              if not self.__check_job_status():                break          else:            serviceInfo = xmlrpcClient.getURLs(serviceName)                       break       except HodInterruptException,h :        raise h      except:        self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)        self.__log.debug(get_exception_string())        status = False        break        if serviceAddress == 'not found' or not serviceAddress:      self.__log.critical("Failed to retrieve '%s' service address." %                           serviceName)      status = False    elif serviceAddress.startswith("Error: "):      errs = serviceAddress[len("Error: "):]      self.__log.critical("Cluster could not be allocated because of the following errors.\n%s" % \                             errs)      status = False    else:      try:        self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'],                                             self.jobId, self.__hostname,                                             serviceName, 'grid', serviceInfo)              except HodInterruptException, h:        raise h      except:        self.__log.critical("'%s': registry xmlrpc error." % serviceName)            self.__log.debug(get_exception_string())        status = False            return status, serviceAddress, serviceInfo  def __collect_jobtracker_ui(self, dir):     link = self.mapredInfo + "/jobtracker.jsp"     parser = miniHTMLParser()     parser.setBaseUrl(self.mapredInfo)     node_cache = {}     self.__log.debug("collect_jobtracker_ui seeded with " + link)     def alarm_handler(number, stack):         raise AlarmException("timeout")            signal.signal(signal.SIGALRM, alarm_handler)     input = None     while link:       self.__log.debug("link: %s" % link)       # taskstats.jsp,taskdetails.jsp not included since too many to collect       if re.search(         "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp",          link):         for i in range(1,5):           if hodInterrupt.isSet():             raise HodInterruptException()           try:             input = urllib.urlopen(link)             break           except:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -