📄 ringmaster.py
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements. See the NOTICE file#distributed with this work for additional information#regarding copyright ownership. The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License. You may obtain a copy of the License at# http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.#!/usr/bin/env python"""manages services and nodepool"""# -*- python -*-import os, sys, random, time, sets, shutil, threadingimport urllib, urlparse, re, getpass, pprint, signal, shutilfrom pprint import pformatfrom HTMLParser import HTMLParserbinfile = sys.path[0]libdir = os.path.dirname(binfile)sys.path.append(libdir)import hodlib.Common.loggerfrom hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatusfrom hodlib.Common.threads import func from hodlib.Hod.nodePool import *from hodlib.Common.util import *from hodlib.Common.nodepoolutil import NodePoolUtilfrom hodlib.Common.socketServers import hodXMLRPCServerfrom hodlib.Common.socketServers import threadedHTTPServerfrom hodlib.NodePools import *from hodlib.NodePools.torque import *from hodlib.GridServices import *from hodlib.Common.descGenerator import *from hodlib.Common.xmlrpc import hodXRClientfrom hodlib.Common.miniHTMLParser import miniHTMLParserfrom hodlib.Common.threads import simpleCommandclass ringMasterServer: """The RPC server that exposes all the master config changes. Also, one of these RPC servers runs as a proxy and all the hodring instances register with this proxy""" instance = None xmlrpc = None def __init__(self, cfg, log, logMasterSources, retry=5): try: from hodlib.Common.socketServers import twistedXMLRPCServer ringMasterServer.xmlrpc = twistedXMLRPCServer("", cfg['ringmaster']['xrs-port-range']) except ImportError: log.info("Twisted interface not found. Using hodXMLRPCServer.") ringMasterServer.xmlrpc = hodXMLRPCServer("", cfg['ringmaster']['xrs-port-range']) ringMasterServer.xmlrpc.register_instance(logMasterSources) self.logMasterSources = logMasterSources ringMasterServer.xmlrpc.serve_forever() while not ringMasterServer.xmlrpc.is_alive(): time.sleep(.5) log.debug('Ringmaster RPC Server at %d' % ringMasterServer.xmlrpc.server_address[1]) def startService(ss, cfg, np, log, rm): logMasterSources = _LogMasterSources(ss, cfg, np, log, rm) ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources) def stopService(): ringMasterServer.xmlrpc.stop() def getPort(): return ringMasterServer.instance.port def getAddress(): return 'http://%s:%d/' % (socket.gethostname(), ringMasterServer.xmlrpc.server_address[1]) startService = staticmethod(startService) stopService = staticmethod(stopService) getPort = staticmethod(getPort) getAddress = staticmethod(getAddress) class _LogMasterSources: """All the methods that are run by the RPC server are added into this class """ def __init__(self, serviceDict, cfg, np, log, rm): self.serviceDict = serviceDict self.tarSource = [] self.tarSourceLock = threading.Lock() self.dict = {} self.count = {} self.logsourceList = [] self.logsourceListLock = threading.Lock() self.masterParam = [] self.masterParamLock = threading.Lock() self.verify = 'none' self.cmdLock = threading.Lock() self.cfg = cfg self.log = log self.np = np self.rm = rm self.hdfsHost = None self.mapredHost = None self.maxconnect = self.cfg['ringmaster']['max-connect'] self.log.debug("Using max-connect value %s"%self.maxconnect) def registerTarSource(self, hostname, url, addr=None): self.log.debug("registering: " + url) lock = self.tarSourceLock lock.acquire() self.dict[url] = url self.count[url] = 0 # addr is None when ringMaster himself invokes this method if addr: c = self.count[addr] self.count[addr] = c - 1 lock.release() if addr: str = "%s is done" % (addr) self.log.debug(str) return url def getTarList(self,hodring): # this looks useful lock = self.tarSourceLock lock.acquire() leastkey = None leastval = -1 for k, v in self.count.iteritems(): if (leastval == -1): leastval = v pass if (v <= leastval and v < self.maxconnect): leastkey = k leastval = v if (leastkey == None): url = 'none' else: url = self.dict[leastkey] self.count[leastkey] = leastval + 1 self.log.debug("%s %d" % (leastkey, self.count[leastkey])) lock.release() self.log.debug('sending url ' + url+" to "+hodring) # this looks useful return url def tarDone(self, uri): str = "%s is done" % (uri) self.log.debug(str) lock = self.tarSourceLock lock.acquire() c = self.count[uri] self.count[uri] = c - 1 lock.release() return uri def status(self): return True# FIXME: this code is broken, it relies on a central service registry## def clusterStart(self, changedClusterParams=[]):# self.log.debug("clusterStart method invoked.")# self.dict = {}# self.count = {}# try:# if (len(changedClusterParams) > 0):# self.log.debug("Updating config.")# for param in changedClusterParams:# (key, sep1, val) = param.partition('=')# (i1, sep2, i2) = key.partition('.')# try:# prev = self.cfg[i1][i2]# self.rm.cfg[i1][i2] = val# self.cfg[i1][i2] = val# self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))# except KeyError, e:# self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)# self.log.debug("Regenerating Service Description.")# dGen = DescGenerator(self.rm.cfg)# self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()# self.cfg['servicedesc'] = self.rm.cfg['servicedesc']# # self.rm.tar = None# if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):# self.rm.download = True# self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']# self.log.debug("self.rm.tar=%s" % self.rm.tar)# # self.rm.cd_to_tempdir()## self.rm.tarAddress = None # hostname = socket.gethostname()# if (self.rm.download):# self.rm.basename = os.path.basename(self.rm.tar)# dest = os.path.join(os.getcwd(), self.rm.basename)# src = self.rm.tar # self.log.debug("cp %s -> %s" % (src, dest))# shutil.copy(src, dest) # self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)# self.registerTarSource(hostname, self.rm.tarAddress)# self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)# else:# self.log.debug("Download not set.")# # if (self.rm.tar != None):# self.cfg['hodring']['download-addr'] = self.rm.tarAddress# self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress## sdl = self.rm.cfg['servicedesc']# workDirs = self.rm.getWorkDirs(self.rm.cfg, True)# hdfsDesc = sdl['hdfs']# hdfs = None# if hdfsDesc.isExternal():# hdfs = HdfsExternal(hdfsDesc, workDirs)# else:# hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)# # self.rm.serviceDict[hdfs.getName()] = hdfs# mrDesc = sdl['mapred']# mr = None# if mrDesc.isExternal():# mr = MapReduceExternal(mrDesc, workDirs)# else:# mr = MapReduce(mrDesc, workDirs, 1)# self.rm.serviceDict[mr.getName()] = mr## ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],# self.np.getServiceId(), 'hodring', 'hod') # # slaveList = ringList# hdfsringXRAddress = None# # Start HDFS Master - Step 1# if not hdfsDesc.isExternal():# masterFound = False# for ring in ringList:# ringXRAddress = ring['xrs']# if ringXRAddress == None:# raise Exception("Could not get hodring XML-RPC server address.")# if (ringXRAddress.find(self.hdfsHost) != -1):# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)# hdfsringXRAddress = ringXRAddress# self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")# ringClient.clusterStart()# masterFound = True # slaveList.remove(ring)# break# if not masterFound:# raise Exception("HDFS Master host not found")# while hdfs.getInfoAddrs() == None:# self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")# time.sleep(1)## # Start MAPRED Master - Step 2# if not mrDesc.isExternal():# masterFound = False# for ring in ringList:# ringXRAddress = ring['xrs']# if ringXRAddress == None:# raise Exception("Could not get hodring XML-RPC server address.")# if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)# self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")# ringClient.clusterStart()# masterFound = True # slaveList.remove(ring)# break# if not masterFound:# raise Excpetion("MAPRED Master host not found")# while mr.getInfoAddrs() == None:# self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \# mapred.job.tracker.info.port")# time.sleep(1)## # Start Slaves - Step 3 # for ring in slaveList:# ringXRAddress = ring['xrs']# if ringXRAddress == None:# raise Exception("Could not get hodring XML-RPC server address.")# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)# self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")# ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())# ring['thread'] = ringThread# ringThread.start()## for ring in slaveList:# ringThread = ring['thread']# if ringThread == None:# raise Exception("Could not get hodring thread (Slave).")# ringThread.join()# self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")## # Run Admin Commands on HDFS Master - Step 4# if not hdfsDesc.isExternal():# if hdfsringXRAddress == None:# raise Exception("HDFS Master host not found (to Run Admin Commands)")# ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)# self.log.debug("Invoking clusterStart(False) - Admin on "# + hdfsringXRAddress + " (HDFS Master)")# ringClient.clusterStart(False)## except:# self.log.debug(get_exception_string())# return False## self.log.debug("Successfully started cluster.")# return True## def clusterStop(self):# self.log.debug("clusterStop method invoked.")# try:# hdfsAddr = self.getServiceAddr('hdfs')# if hdfsAddr.find(':') != -1:# h, p = hdfsAddr.split(':', 1)# self.hdfsHost = h# self.log.debug("hdfsHost: " + self.hdfsHost)# mapredAddr = self.getServiceAddr('mapred')# if mapredAddr.find(':') != -1:# h, p = mapredAddr.split(':', 1)# self.mapredHost = h# self.log.debug("mapredHost: " + self.mapredHost)# ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],# self.np.getServiceId(),# 'hodring', 'hod')# for ring in ringList:# ringXRAddress = ring['xrs']# if ringXRAddress == None:# raise Exception("Could not get hodring XML-RPC server address.")
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -