⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hodsvc.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $##------------------------------------------------------------------------------import os, time, shutil, xmlrpclib, socket, pprintfrom signal import *from hodlib.Common.logger import hodLog, hodDummyLoggerfrom hodlib.Common.socketServers import hodXMLRPCServerfrom hodlib.Common.util import local_fqdnfrom hodlib.Common.xmlrpc import hodXRClientclass hodBaseService:  """hodBaseService class - This class provides service registration, logging,     and configuration access methods.  It also provides an XML-RPC server.     This class should be extended to create hod services.  Methods beginning     with _xr_method will automatically be added to instances of this class.     """  def __init__(self, name, config, xrtype='threaded'):    """ Initialization requires a name string and a config object of type        hodlib.Common.setup.options or hodlib.Common.setup.config."""            self.name = name    self.hostname = local_fqdn()    self._cfg = config    self._xrc = None    self.logs = {}    self._baseLogger = None    self._serviceID = os.getenv('PBS_JOBID')            self.__logDir = None    self.__svcrgy = None    self.__stop = False    self.__xrtype = xrtype        self._init_logging()            if name != 'serviceRegistry': self._init_signals()    self._init_xrc_server()      def __set_logging_level(self, level):    self.logs['main'].info("Setting log level to %s." % level)    for loggerName in self.loggers.keys():      self.logs['main'].set_logger_level(loggerName, level)  def __get_logging_level(self):    if self._cfg.has_key('stream'):      return self.loggers['main'].get_level('stream', 'main')    elif self._cfg.has_key('log-dir'):      return self.loggers['main'].get_level('file', 'main')    else:      return 0    def _xr_method_stop(self, *args):    """XML-RPC method, calls stop() on ourselves."""        return self.stop()    def _xr_method_status(self, *args):    """XML-RPC method, calls status() on ourselves."""        return self.status()    def _init_logging(self):    if self._cfg.has_key('debug'):      if self._cfg['debug'] > 0:        self._baseLogger = hodLog(self.name)        self.logs['main'] = self._baseLogger.add_logger('main')                if self._cfg.has_key('stream'):          if self._cfg['stream']:            self._baseLogger.add_stream(level=self._cfg['debug'],                                  addToLoggerNames=('main',))                    if self._cfg.has_key('log-dir'):          if self._serviceID:              self.__logDir = os.path.join(self._cfg['log-dir'], "%s.%s" % (                                       self._cfg['userid'], self._serviceID))          else:              self.__logDir = os.path.join(self._cfg['log-dir'],                                            self._cfg['userid'])          if not os.path.exists(self.__logDir):            os.mkdir(self.__logDir)                      self._baseLogger.add_file(logDirectory=self.__logDir,             level=self._cfg['debug'], addToLoggerNames=('main',))                  if self._cfg.has_key('syslog-address'):          self._baseLogger.add_syslog(self._cfg['syslog-address'],             level=self._cfg['debug'], addToLoggerNames=('main',))                if not self.logs.has_key('main'):          self.logs['main'] = hodDummyLogger()      else:        self.logs['main'] = hodDummyLogger()    else:      self.logs['main'] = hodDummyLogger()    def _init_signals(self):    def sigStop(sigNum, handler):      self.sig_wrapper(sigNum, self.stop)    def toggleLevel():      currentLevel = self.__get_logging_level()      if currentLevel == 4:        self.__set_logging_level(1)      else:        self.__set_logging_level(currentLevel + 1)    def sigStop(sigNum, handler):      self._sig_wrapper(sigNum, self.stop)    def sigDebug(sigNum, handler):      self.sig_wrapper(sigNum, toggleLevel)    signal(SIGTERM, sigStop)    signal(SIGQUIT, sigStop)    signal(SIGINT, sigStop)    signal(SIGUSR2, sigDebug)  def _sig_wrapper(self, sigNum, handler, *args):    self.logs['main'].info("Caught signal %s." % sigNum)    if args:        handler(args)    else:        handler()    def _init_xrc_server(self):    host = None    ports = None    if self._cfg.has_key('xrs-address'):      (host, port) = (self._cfg['xrs-address'][0], self._cfg['xrs-address'][1])      ports = (port,)    elif self._cfg.has_key('xrs-port-range'):      host = ''      ports = self._cfg['xrs-port-range']        if host != None:        if self.__xrtype == 'threaded':        self._xrc = hodXMLRPCServer(host, ports)      elif self.__xrtype == 'twisted':        try:          from socketServers import twistedXMLRPCServer          self._xrc = twistedXMLRPCServer(host, ports, self.logs['main'])        except ImportError:          self.logs['main'].error("Twisted XML-RPC server not available, "                                  + "falling back on threaded server.")          self._xrc = hodXMLRPCServer(host, ports)      for attr in dir(self):        if attr.startswith('_xr_method_'):          self._xrc.register_function(getattr(self, attr),                                      attr[11:])          self._xrc.register_introspection_functions()    def _register_service(self, port=None, installSignalHandlers=1):    if self.__svcrgy:      self.logs['main'].info(          "Registering service with service registery %s... " % self.__svcrgy)      svcrgy = hodXRClient(self.__svcrgy, None, None, 0, 0, installSignalHandlers)            if self._xrc and self._http:        svcrgy.registerService(self._cfg['userid'], self._serviceID,                                self.hostname, self.name, 'hod', {                               'xrs' : "http://%s:%s" % (                               self._xrc.server_address[0],                                self._xrc.server_address[1]),'http' :                                "http://%s:%s" % (self._http.server_address[0],                                self._http.server_address[1])})      elif self._xrc:        svcrgy.registerService(self._cfg['userid'], self._serviceID,                                self.hostname, self.name, 'hod', {                               'xrs' : "http://%s:%s" % (                               self._xrc.server_address[0],                                self._xrc.server_address[1]),})      elif self._http:        svcrgy.registerService(self._cfg['userid'], self._serviceID,                                self.hostname, self.name, 'hod', {'http' :                                "http://%s:%s" % (self._http.server_address[0],                                self._http.server_address[1]),})              else:        svcrgy.registerService(self._cfg['userid'], self._serviceID,                                self.hostname, name, 'hod', {} )    def start(self):    """ Start XML-RPC server and register service."""        self.logs['main'].info("Starting HOD service: %s ..." % self.name)    if self._xrc: self._xrc.serve_forever()    if self._cfg.has_key('register') and self._cfg['register']:        self._register_service()    def stop(self):    """ Stop XML-RPC server, unregister service and set stop flag. """        self.logs['main'].info("Stopping service...")    if self._xrc: self._xrc.stop()    self.__stop = True      return True    def status(self):    """Returns true, should be overriden."""        return True    def wait(self):    """Wait until stop method is called."""        while not self.__stop:      time.sleep(.1)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -