⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 service.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License."""defines Service as abstract interface"""# -*- python -*-import random, socketclass Service:  """ the service base class that all the   other services inherit from. """  def __init__(self, serviceDesc, workDirs):    self.serviceDesc = serviceDesc    self.workDirs = workDirs  def getName(self):    return self.serviceDesc.getName()  def getInfoAddrs(self):    """Return a list of addresses that provide     information about the servie"""    return []  def isLost(self):    """True if the service is down"""    raise NotImplementedError  def addNodes(self, nodeList):    """add nodeSet"""    raise NotImplementedError  def removeNodes(self, nodeList):    """remove a nodeset"""    raise NotImplementedError  def getWorkers(self):     raise NotImplementedError  def needsMore(self):    """return number of nodes the service wants to add"""    raise NotImplementedError  def needsLess(self):    """return number of nodes the service wants to remove"""    raise NotImplementedErrorclass MasterSlave(Service):  """ the base class for a master slave   service architecture. """  def __init__(self, serviceDesc, workDirs,requiredNode):    Service.__init__(self, serviceDesc, workDirs)    self.launchedMaster = False    self.masterInitialized = False    self.masterAddress = 'none'    self.requiredNode = requiredNode    self.failedMsg = None    self.masterFailureCount = 0  def getRequiredNode(self):    return self.requiredNode   def getMasterRequest(self):    """ the number of master you need    to run for this service. """    raise NotImplementedError    def isLaunchable(self, serviceDict):    """ if your service does not depend on    other services. is set to true by default. """    return True    def getMasterCommands(self, serviceDict):    """ a list of master commands you     want to run for this service. """    raise NotImplementedError  def getAdminCommands(self, serviceDict):    """ a list of admin commands you     want to run for this service. """    raise NotImplementedError  def getWorkerCommands(self, serviceDict):    """ a list of worker commands you want to     run for this service. """    raise NotImplementedError  def setMasterNodes(self, list):    """ set the status of master nodes     after they start running on a node cluster. """    raise NotImplementedError  def addNodes(self, list):    """ add nodes to a service. Not implemented    currently. """    raise NotImplementedError  def getMasterAddrs(self):    """ return the addresses of master. the     hostname:port to which worker nodes should    connect. """    raise NotImplementedError    def setMasterParams(self, list):    """ set the various master params     depending on what each hodring set     the master params to. """    raise NotImplementedError  def setlaunchedMaster(self):    """ set the status of master launched    to true. """    self.launchedMaster = True  def isMasterLaunched(self):    """ return if a master has been launched    for the service or not. """    return self.launchedMaster  def isMasterInitialized(self):    """ return if a master if launched     has been initialized or not. """    return self.masterInitialized  def setMasterInitialized(self):    """ set the master initialized to    true. """    self.masterInitialized = True    # Reset failure related variables, as master is initialized successfully.    self.masterFailureCount = 0    self.failedMsg = None  def getMasterAddress(self):    """ it needs to change to reflect     more that one masters. Currently it     keeps a knowledge of where the master     was launched and to keep track if it was actually    up or not. """    return self.masterAddress  def setMasterAddress(self, addr):    self.masterAddress = addr  def isExternal(self):    return self.serviceDesc.isExternal()  def setMasterFailed(self, err):    """Sets variables related to Master failure"""    self.masterFailureCount += 1    self.failedMsg = err    # When command is sent to HodRings, this would have been set to True.    # Reset it to reflect the correct status.    self.launchedMaster = False  def getMasterFailed(self):    return self.failedMsg   def getMasterFailureCount(self):    return self.masterFailureCount class NodeRequest:  """ A class to define   a node request. """  def __init__(self, n, required = [], preferred = [], isPreemptee = True):    self.numNodes = n    self.preferred = preferred    self.isPreemptee = isPreemptee    self.required = required  def setNumNodes(self, n):    self.numNodes = n  def setPreferredList(self, list):    self.preferred = list  def setIsPreemptee(self, flag):    self.isPreemptee = flagclass ServiceUtil:  """ this class should be moved out of   service.py to a util file"""  localPortUsed = {}      def getUniqRandomPort(h=None, low=50000, high=60000, retry=900, log=None):    """This allocates a randome free port between low and high"""    # We use a default value of 900 retries, which takes an agreeable    # time limit of ~ 6.2 seconds to check 900 ports, in the worse case    # of no available port in those 900.    while retry > 0:      n = random.randint(low, high)      if n in ServiceUtil.localPortUsed:        continue      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)      if not h:        h = socket.gethostname()      avail = False      if log: log.debug("Trying to see if port %s is available"% n)      try:        s.bind((h, n))        if log: log.debug("Yes, port %s is available" % n)        avail = True      except socket.error,e:        if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))        retry -= 1        pass      # The earlier code that used to be here had syntax errors. The code path      # couldn't be followd anytime, so the error remained uncaught.      # This time I stumbled upon the error      s.close()      if avail:        ServiceUtil.localPortUsed[n] = True        return n    raise ValueError, "Can't find unique local port between %d and %d" % (low, high)    getUniqRandomPort = staticmethod(getUniqRandomPort)    def getUniqPort(h=None, low=40000, high=60000, retry=900, log=None):    """get unique port on a host that can be used by service    This and its consumer code should disappear when master    nodes get allocatet by nodepool"""    # We use a default value of 900 retries, which takes an agreeable    # time limit of ~ 6.2 seconds to check 900 ports, in the worse case    # of no available port in those 900.    n  = low    while retry > 0:      n = n + 1      if n in ServiceUtil.localPortUsed:        continue      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)      if not h:        h = socket.gethostname()      avail = False      if log: log.debug("Trying to see if port %s is available"% n)      try:        s.bind((h, n))        if log: log.debug("Yes, port %s is available" % n)        avail = True      except socket.error,e:        if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))        retry -= 1        pass      s.close()      if avail:        ServiceUtil.localPortUsed[n] = True        return n    raise ValueError, "Can't find unique local port between %d and %d" % (low, high)  getUniqPort = staticmethod(getUniqPort)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -