⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ringmaster.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.#!/usr/bin/env python"""manages services and nodepool"""# -*- python -*-import os, sys, random, time, sets, shutil, threadingimport urllib, urlparse, re, getpass, pprint, signal, shutilfrom pprint import pformatfrom HTMLParser import HTMLParserbinfile = sys.path[0]libdir = os.path.dirname(binfile)sys.path.append(libdir)import hodlib.Common.loggerfrom hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatusfrom hodlib.Common.threads import func from hodlib.Hod.nodePool import *from hodlib.Common.util import *from hodlib.Common.nodepoolutil import NodePoolUtilfrom hodlib.Common.socketServers import hodXMLRPCServerfrom hodlib.Common.socketServers import threadedHTTPServerfrom hodlib.NodePools import *from hodlib.NodePools.torque import *from hodlib.GridServices import *from hodlib.Common.descGenerator import *from hodlib.Common.xmlrpc import hodXRClientfrom hodlib.Common.miniHTMLParser import miniHTMLParserfrom hodlib.Common.threads import simpleCommandclass ringMasterServer:  """The RPC server that exposes all the master config  changes. Also, one of these RPC servers runs as a proxy  and all the hodring instances register with this proxy"""  instance = None  xmlrpc = None    def __init__(self, cfg, log, logMasterSources, retry=5):    try:      from hodlib.Common.socketServers import twistedXMLRPCServer      ringMasterServer.xmlrpc = twistedXMLRPCServer("",         cfg['ringmaster']['xrs-port-range'])    except ImportError:      log.info("Twisted interface not found. Using hodXMLRPCServer.")      ringMasterServer.xmlrpc = hodXMLRPCServer("",         cfg['ringmaster']['xrs-port-range'])    ringMasterServer.xmlrpc.register_instance(logMasterSources)    self.logMasterSources = logMasterSources    ringMasterServer.xmlrpc.serve_forever()            while not ringMasterServer.xmlrpc.is_alive():      time.sleep(.5)              log.debug('Ringmaster RPC Server at %d' %                  ringMasterServer.xmlrpc.server_address[1])      def startService(ss, cfg, np, log, rm):    logMasterSources = _LogMasterSources(ss, cfg, np, log, rm)    ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources)  def stopService():    ringMasterServer.xmlrpc.stop()    def getPort():    return ringMasterServer.instance.port  def getAddress():    return 'http://%s:%d/' % (socket.gethostname(),                               ringMasterServer.xmlrpc.server_address[1])    startService = staticmethod(startService)  stopService = staticmethod(stopService)  getPort = staticmethod(getPort)  getAddress = staticmethod(getAddress)  class _LogMasterSources:  """All the methods that are run by the RPC server are  added into this class """    def __init__(self, serviceDict, cfg, np, log, rm):    self.serviceDict = serviceDict    self.tarSource = []    self.tarSourceLock = threading.Lock()    self.dict = {}    self.count = {}    self.logsourceList = []    self.logsourceListLock = threading.Lock()    self.masterParam = []    self.masterParamLock = threading.Lock()    self.verify = 'none'    self.cmdLock = threading.Lock()    self.cfg = cfg    self.log = log    self.np = np    self.rm = rm     self.hdfsHost = None    self.mapredHost = None    self.maxconnect = self.cfg['ringmaster']['max-connect']    self.log.debug("Using max-connect value %s"%self.maxconnect)     def registerTarSource(self, hostname, url, addr=None):    self.log.debug("registering: " + url)    lock = self.tarSourceLock    lock.acquire()    self.dict[url] = url    self.count[url] = 0    # addr is None when ringMaster himself invokes this method    if addr:      c = self.count[addr]      self.count[addr] = c - 1    lock.release()    if addr:      str = "%s is done" % (addr)      self.log.debug(str)    return url  def getTarList(self,hodring):   # this looks useful    lock = self.tarSourceLock    lock.acquire()    leastkey = None    leastval = -1    for k, v in self.count.iteritems():      if (leastval  == -1):        leastval = v        pass      if (v <= leastval and v < self.maxconnect):        leastkey = k        leastval = v    if (leastkey == None):      url  = 'none'    else:      url = self.dict[leastkey]      self.count[leastkey] = leastval + 1      self.log.debug("%s %d" % (leastkey, self.count[leastkey]))    lock.release()    self.log.debug('sending url ' + url+" to "+hodring)  # this looks useful    return url  def tarDone(self, uri):    str = "%s is done" % (uri)    self.log.debug(str)    lock = self.tarSourceLock    lock.acquire()    c = self.count[uri]    self.count[uri] = c - 1    lock.release()    return uri  def status(self):    return True# FIXME: this code is broken, it relies on a central service registry##  def clusterStart(self, changedClusterParams=[]):#    self.log.debug("clusterStart method invoked.")#    self.dict = {}#    self.count = {}#    try:#      if (len(changedClusterParams) > 0):#        self.log.debug("Updating config.")#        for param in changedClusterParams:#          (key, sep1, val) = param.partition('=')#          (i1, sep2, i2) = key.partition('.')#          try:#            prev = self.cfg[i1][i2]#            self.rm.cfg[i1][i2] = val#            self.cfg[i1][i2] = val#            self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))#          except KeyError, e:#            self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)#        self.log.debug("Regenerating Service Description.")#        dGen = DescGenerator(self.rm.cfg)#        self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()#        self.cfg['servicedesc'] = self.rm.cfg['servicedesc']#  #      self.rm.tar = None#      if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):#        self.rm.download = True#        self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']#        self.log.debug("self.rm.tar=%s" % self.rm.tar)# #      self.rm.cd_to_tempdir()##      self.rm.tarAddress = None #      hostname = socket.gethostname()#      if (self.rm.download):#        self.rm.basename = os.path.basename(self.rm.tar)#        dest = os.path.join(os.getcwd(), self.rm.basename)#        src =  self.rm.tar  #        self.log.debug("cp %s -> %s" % (src, dest))#        shutil.copy(src, dest) #        self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)#        self.registerTarSource(hostname, self.rm.tarAddress)#        self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)#      else:#        self.log.debug("Download not set.")#      #      if (self.rm.tar != None):#        self.cfg['hodring']['download-addr'] = self.rm.tarAddress#        self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress##      sdl = self.rm.cfg['servicedesc']#      workDirs = self.rm.getWorkDirs(self.rm.cfg, True)#      hdfsDesc = sdl['hdfs']#      hdfs = None#      if hdfsDesc.isExternal():#        hdfs = HdfsExternal(hdfsDesc, workDirs)#      else:#        hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)#    #      self.rm.serviceDict[hdfs.getName()] = hdfs#      mrDesc = sdl['mapred']#      mr = None#      if mrDesc.isExternal():#        mr = MapReduceExternal(mrDesc, workDirs)#      else:#        mr = MapReduce(mrDesc, workDirs, 1)#      self.rm.serviceDict[mr.getName()] = mr##      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],#        self.np.getServiceId(), 'hodring', 'hod') #    #      slaveList = ringList#      hdfsringXRAddress = None#      # Start HDFS Master - Step 1#      if not hdfsDesc.isExternal():#        masterFound = False#        for ring in ringList:#          ringXRAddress = ring['xrs']#          if ringXRAddress == None:#            raise Exception("Could not get hodring XML-RPC server address.")#          if  (ringXRAddress.find(self.hdfsHost) != -1):#            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)#            hdfsringXRAddress = ringXRAddress#            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")#            ringClient.clusterStart()#            masterFound = True #            slaveList.remove(ring)#            break#        if not masterFound:#          raise Exception("HDFS Master host not found")#        while hdfs.getInfoAddrs() == None:#          self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")#          time.sleep(1)##      # Start MAPRED Master - Step 2#      if not mrDesc.isExternal():#        masterFound = False#        for ring in ringList:#          ringXRAddress = ring['xrs']#          if ringXRAddress == None:#            raise Exception("Could not get hodring XML-RPC server address.")#          if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):#            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)#            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")#            ringClient.clusterStart()#            masterFound = True #            slaveList.remove(ring)#            break#        if not masterFound:#          raise Excpetion("MAPRED Master host not found")#        while mr.getInfoAddrs() == None:#          self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \# mapred.job.tracker.info.port")#          time.sleep(1)##      # Start Slaves - Step 3 #      for ring in slaveList:#          ringXRAddress = ring['xrs']#          if ringXRAddress == None:#            raise Exception("Could not get hodring XML-RPC server address.")#          ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)#          self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")#          ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())#          ring['thread'] = ringThread#          ringThread.start()##      for ring in slaveList:#        ringThread = ring['thread']#        if ringThread == None:#          raise Exception("Could not get hodring thread (Slave).")#        ringThread.join()#        self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")##      # Run Admin Commands on HDFS Master - Step 4#      if not hdfsDesc.isExternal():#        if hdfsringXRAddress == None:#          raise Exception("HDFS Master host not found (to Run Admin Commands)")#        ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)#        self.log.debug("Invoking clusterStart(False) - Admin on "#                       + hdfsringXRAddress + " (HDFS Master)")#        ringClient.clusterStart(False)##    except:#      self.log.debug(get_exception_string())#      return False##    self.log.debug("Successfully started cluster.")#    return True##  def clusterStop(self):#    self.log.debug("clusterStop method invoked.")#    try:#      hdfsAddr = self.getServiceAddr('hdfs')#      if hdfsAddr.find(':') != -1:#        h, p = hdfsAddr.split(':', 1)#        self.hdfsHost = h#        self.log.debug("hdfsHost: " + self.hdfsHost)#      mapredAddr = self.getServiceAddr('mapred')#      if mapredAddr.find(':') != -1:#        h, p = mapredAddr.split(':', 1)#        self.mapredHost = h#        self.log.debug("mapredHost: " + self.mapredHost)#      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],#                                                      self.np.getServiceId(),#                                                      'hodring', 'hod')#      for ring in ringList:#        ringXRAddress = ring['xrs']#        if ringXRAddress == None:#          raise Exception("Could not get hodring XML-RPC server address.")

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -