⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketservers.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 2 页
字号:
                        raise socket.error, bindError                else:                    break        else:            self.server_address = (self.host, int(self.ports[0]))            self.socket.bind(self.server_address)                if self.host == '':            self.server_address = (local_fqdn(), self.server_address[1])    def _serve_forever(self):        """Replacement for serve_forever loop.                   All baseSocketServers run within a master thread; that thread           imitates serve_forever, but checks an event (self.__stopForever)            before processing new connections.        """                while not self.__stopForever.isSet():            (rlist, wlist, xlist) = select([self.socket], [], [],                                            1)                        if (len(rlist) > 0 and self.socket == rlist[0]):                self.handle_request()                    while not self.__run.isSet():                if self.__stopForever.isSet():                    break                time.sleep(1)                self.server_close()                return True    def serve_forever(self):        """Handle requests until stopForever event flag indicates stop."""        self.mThread = threading.Thread(name="baseSocketServer",                                         target=self._serve_forever)        self.mThread.start()        return self.mThread    def pause(self):        """Temporarily stop servicing requests."""        self.__run.clear()    def cont(self):        """Resume servicing requests."""        self.__run.set()    def stop(self):        """Set the stopForever flag to tell serve_forever() to exit."""            self.__stopForever.set()        if self.mThread: self.mThread.join()        return True    def is_alive(self):        if self.mThread != None:            return self.mThread.isAlive()        else:            return Falseclass threadedHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer):    def __init__(self, host, ports):        baseSocketServer.__init__(self, host, ports)        HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)class forkingHTTPServer(baseSocketServer, ForkingMixIn, HTTPServer):    def __init__(self, host, ports):        baseSocketServer.__init__(self, host, ports)        HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)class hodHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer):    service = None     def __init__(self, host, ports, serviceobj = None):        self.service = serviceobj        baseSocketServer.__init__(self, host, ports)        HTTPServer.__init__(self, self.server_address, hodHTTPHandler)    def finish_request(self, request, client_address):        self.RequestHandlerClass(request, client_address, self, self.service)        class hodXMLRPCServer(baseSocketServer, ThreadingMixIn, SimpleXMLRPCServer):    def __init__(self, host, ports,                  requestHandler=SimpleXMLRPCRequestHandler,                  logRequests=False, allow_none=False, encoding=None):        baseSocketServer.__init__(self, host, ports)        SimpleXMLRPCServer.__init__(self, self.server_address, requestHandler,                                     logRequests)                self.register_function(self.stop, 'stop')try:    from twisted.web import server, xmlrpc    from twisted.internet import reactor, defer    from twisted.internet.threads import deferToThread    from twisted.python import log                    class twistedXMLRPC(xmlrpc.XMLRPC):        def __init__(self, logger):            xmlrpc.XMLRPC.__init__(self)                        self.__XRMethods = {}            self.__numRequests = 0            self.__logger = logger            self.__pause = False            def render(self, request):            request.content.seek(0, 0)            args, functionPath = xmlrpclib.loads(request.content.read())            try:                function = self._getFunction(functionPath)            except Fault, f:                self._cbRender(f, request)            else:                request.setHeader("content-type", "text/xml")                defer.maybeDeferred(function, *args).addErrback(                    self._ebRender).addCallback(self._cbRender, request)                        return server.NOT_DONE_YET            def _cbRender(self, result, request):            if isinstance(result, xmlrpc.Handler):                result = result.result            if not isinstance(result, Fault):                result = (result,)            try:                s = xmlrpclib.dumps(result, methodresponse=1)            except:                f = Fault(self.FAILURE, "can't serialize output")                s = xmlrpclib.dumps(f, methodresponse=1)            request.setHeader("content-length", str(len(s)))            request.write(s)            request.finish()             def _ebRender(self, failure):            if isinstance(failure.value, Fault):                return failure.value            log.err(failure)            return Fault(self.FAILURE, "error")                def _getFunction(self, methodName):            while self.__pause:                time.sleep(1)                        self.__numRequests = self.__numRequests + 1            function = None            try:                def defer_function(*args):                    return deferToThread(self.__XRMethods[methodName],                                          *args)                function = defer_function                self.__logger.info(                    "[%s] processing defered XML-RPC call to: %s ..." %                     (self.__numRequests, methodName))                        except KeyError:                self.__logger.warn(                    "[%s] fault %s on XML-RPC call to %s, method not found." % (                    self.__numRequests, self.NOT_FOUND, methodName))                raise xmlrpc.NoSuchFunction(self.NOT_FOUND,                                             "method %s not found" % methodName)                        return function                def register_function(self, functionRef, methodName):            self.__XRMethods[methodName] = functionRef                    def list_methods(self):            return self.__XRMethods.keys()                def num_requests(self):            return self.__numRequests                def pause(self):            self.__pause = True                def cont(self):            self.__pause = False                class twistedXMLRPCServer:        def __init__(self, host, ports, logger=None, threadPoolSize=100):            self.__host = host            self.__ports = ports                        if logger == None:                logger = hodDummyLogger()                        self.__logger = logger                            self.server_address = ['', '']            reactor.suggestThreadPoolSize(threadPoolSize)                    self.__stopForever = threading.Event()            self.__stopForever.clear()            self.__mThread = None                            self.__xmlrpc = twistedXMLRPC(self.__logger)                        def _serve_forever(self):            if len(self.__ports) > 1:                randomPort = Random(os.getpid())                portSequence = range(self.__ports[0], self.__ports[1])                    maxTryCount = abs(self.__ports[0] - self.__ports[1])                tryCount = 0                while True:                    somePort = randomPort.choice(portSequence)                    self.server_address = (self.__host, int(somePort))                    if self.__host == '':                        self.server_address = (local_fqdn(), self.server_address[1])                    try:                        reactor.listenTCP(int(somePort), server.Site(                            self.__xmlrpc), interface=self.__host)                        reactor.run(installSignalHandlers=0)                    except:                        self.__logger.debug("Failed to bind to: %s:%s." % (                            self.__host, somePort))                        tryCount = tryCount + 1                        if tryCount > maxTryCount:                            self.__logger.warn("Failed to bind to: %s:%s" % (                                self.__host, self.__ports))                            sys.exit(1)                    else:                        break            else:                try:                    self.server_address = (self.__host, int(self.__ports[0]))                    if self.__host == '':                        self.server_address = (local_fqdn(), self.server_address[1])                    reactor.listenTCP(int(self.__ports[0]), server.Site(self.__xmlrpc),                                       interface=self.__host)                    reactor.run(installSignalHandlers=0)                except:                    self.__logger.warn("Failed to bind to: %s:%s."% (                            self.__host, self.__ports[0]))                    sys.exit(1)                    def serve_forever(self):            """Handle requests until stopForever event flag indicates stop."""                self.__mThread = threading.Thread(name="XRServer",                                              target=self._serve_forever)            self.__mThread.start()                        if not self.__mThread.isAlive():                raise Exception("Twisted XMLRPC server thread dead.")                            def register_function(self, functionRef, methodName):            self.__xmlrpc.register_function(functionRef, methodName)                def register_introspection_functions(self):            pass                def register_instance(self, instance):            for method in dir(instance):                if not method.startswith('_'):                    self.register_function(getattr(instance, method), method)                def pause(self):            self.__xmlrpc.pause()                def cont(self):            self.__xmlrpc.cont()                def stop(self):            def stop_thread():                time.sleep(2)                reactor.stop()                            self.__stopForever.set()                        stopThread = threading.Thread(name='XRStop', target=stop_thread)            stopThread.start()                            return True                    def is_alive(self):            status = False            if reactor.running == 1:                status = True                        return status                def status(self):            """Return status information on running XMLRPC Server."""            stat = { 'XR server address'     : self.server_address,                     'XR methods'            : self.system_listMethods(),                     'XR server alive'       : self.is_alive(),                     'XR requests processed' : self.__xmlrpc.num_requests(),                     'XR server stop flag'   : self.__stopForever.isSet()}            return(stat)                def system_listMethods(self):            return self.__xmlrpc.list_methods()                def get_server_address(self):            waitCount = 0            while self.server_address == '':                if waitCount == 9:                    break                 time.sleep(1)                waitCount = waitCount + 1                            return self.server_addressexcept ImportError:    pass

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -