📄 socketservers.py
字号:
raise socket.error, bindError else: break else: self.server_address = (self.host, int(self.ports[0])) self.socket.bind(self.server_address) if self.host == '': self.server_address = (local_fqdn(), self.server_address[1]) def _serve_forever(self): """Replacement for serve_forever loop. All baseSocketServers run within a master thread; that thread imitates serve_forever, but checks an event (self.__stopForever) before processing new connections. """ while not self.__stopForever.isSet(): (rlist, wlist, xlist) = select([self.socket], [], [], 1) if (len(rlist) > 0 and self.socket == rlist[0]): self.handle_request() while not self.__run.isSet(): if self.__stopForever.isSet(): break time.sleep(1) self.server_close() return True def serve_forever(self): """Handle requests until stopForever event flag indicates stop.""" self.mThread = threading.Thread(name="baseSocketServer", target=self._serve_forever) self.mThread.start() return self.mThread def pause(self): """Temporarily stop servicing requests.""" self.__run.clear() def cont(self): """Resume servicing requests.""" self.__run.set() def stop(self): """Set the stopForever flag to tell serve_forever() to exit.""" self.__stopForever.set() if self.mThread: self.mThread.join() return True def is_alive(self): if self.mThread != None: return self.mThread.isAlive() else: return Falseclass threadedHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer): def __init__(self, host, ports): baseSocketServer.__init__(self, host, ports) HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)class forkingHTTPServer(baseSocketServer, ForkingMixIn, HTTPServer): def __init__(self, host, ports): baseSocketServer.__init__(self, host, ports) HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)class hodHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer): service = None def __init__(self, host, ports, serviceobj = None): self.service = serviceobj baseSocketServer.__init__(self, host, ports) HTTPServer.__init__(self, self.server_address, hodHTTPHandler) def finish_request(self, request, client_address): self.RequestHandlerClass(request, client_address, self, self.service) class hodXMLRPCServer(baseSocketServer, ThreadingMixIn, SimpleXMLRPCServer): def __init__(self, host, ports, requestHandler=SimpleXMLRPCRequestHandler, logRequests=False, allow_none=False, encoding=None): baseSocketServer.__init__(self, host, ports) SimpleXMLRPCServer.__init__(self, self.server_address, requestHandler, logRequests) self.register_function(self.stop, 'stop')try: from twisted.web import server, xmlrpc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread from twisted.python import log class twistedXMLRPC(xmlrpc.XMLRPC): def __init__(self, logger): xmlrpc.XMLRPC.__init__(self) self.__XRMethods = {} self.__numRequests = 0 self.__logger = logger self.__pause = False def render(self, request): request.content.seek(0, 0) args, functionPath = xmlrpclib.loads(request.content.read()) try: function = self._getFunction(functionPath) except Fault, f: self._cbRender(f, request) else: request.setHeader("content-type", "text/xml") defer.maybeDeferred(function, *args).addErrback( self._ebRender).addCallback(self._cbRender, request) return server.NOT_DONE_YET def _cbRender(self, result, request): if isinstance(result, xmlrpc.Handler): result = result.result if not isinstance(result, Fault): result = (result,) try: s = xmlrpclib.dumps(result, methodresponse=1) except: f = Fault(self.FAILURE, "can't serialize output") s = xmlrpclib.dumps(f, methodresponse=1) request.setHeader("content-length", str(len(s))) request.write(s) request.finish() def _ebRender(self, failure): if isinstance(failure.value, Fault): return failure.value log.err(failure) return Fault(self.FAILURE, "error") def _getFunction(self, methodName): while self.__pause: time.sleep(1) self.__numRequests = self.__numRequests + 1 function = None try: def defer_function(*args): return deferToThread(self.__XRMethods[methodName], *args) function = defer_function self.__logger.info( "[%s] processing defered XML-RPC call to: %s ..." % (self.__numRequests, methodName)) except KeyError: self.__logger.warn( "[%s] fault %s on XML-RPC call to %s, method not found." % ( self.__numRequests, self.NOT_FOUND, methodName)) raise xmlrpc.NoSuchFunction(self.NOT_FOUND, "method %s not found" % methodName) return function def register_function(self, functionRef, methodName): self.__XRMethods[methodName] = functionRef def list_methods(self): return self.__XRMethods.keys() def num_requests(self): return self.__numRequests def pause(self): self.__pause = True def cont(self): self.__pause = False class twistedXMLRPCServer: def __init__(self, host, ports, logger=None, threadPoolSize=100): self.__host = host self.__ports = ports if logger == None: logger = hodDummyLogger() self.__logger = logger self.server_address = ['', ''] reactor.suggestThreadPoolSize(threadPoolSize) self.__stopForever = threading.Event() self.__stopForever.clear() self.__mThread = None self.__xmlrpc = twistedXMLRPC(self.__logger) def _serve_forever(self): if len(self.__ports) > 1: randomPort = Random(os.getpid()) portSequence = range(self.__ports[0], self.__ports[1]) maxTryCount = abs(self.__ports[0] - self.__ports[1]) tryCount = 0 while True: somePort = randomPort.choice(portSequence) self.server_address = (self.__host, int(somePort)) if self.__host == '': self.server_address = (local_fqdn(), self.server_address[1]) try: reactor.listenTCP(int(somePort), server.Site( self.__xmlrpc), interface=self.__host) reactor.run(installSignalHandlers=0) except: self.__logger.debug("Failed to bind to: %s:%s." % ( self.__host, somePort)) tryCount = tryCount + 1 if tryCount > maxTryCount: self.__logger.warn("Failed to bind to: %s:%s" % ( self.__host, self.__ports)) sys.exit(1) else: break else: try: self.server_address = (self.__host, int(self.__ports[0])) if self.__host == '': self.server_address = (local_fqdn(), self.server_address[1]) reactor.listenTCP(int(self.__ports[0]), server.Site(self.__xmlrpc), interface=self.__host) reactor.run(installSignalHandlers=0) except: self.__logger.warn("Failed to bind to: %s:%s."% ( self.__host, self.__ports[0])) sys.exit(1) def serve_forever(self): """Handle requests until stopForever event flag indicates stop.""" self.__mThread = threading.Thread(name="XRServer", target=self._serve_forever) self.__mThread.start() if not self.__mThread.isAlive(): raise Exception("Twisted XMLRPC server thread dead.") def register_function(self, functionRef, methodName): self.__xmlrpc.register_function(functionRef, methodName) def register_introspection_functions(self): pass def register_instance(self, instance): for method in dir(instance): if not method.startswith('_'): self.register_function(getattr(instance, method), method) def pause(self): self.__xmlrpc.pause() def cont(self): self.__xmlrpc.cont() def stop(self): def stop_thread(): time.sleep(2) reactor.stop() self.__stopForever.set() stopThread = threading.Thread(name='XRStop', target=stop_thread) stopThread.start() return True def is_alive(self): status = False if reactor.running == 1: status = True return status def status(self): """Return status information on running XMLRPC Server.""" stat = { 'XR server address' : self.server_address, 'XR methods' : self.system_listMethods(), 'XR server alive' : self.is_alive(), 'XR requests processed' : self.__xmlrpc.num_requests(), 'XR server stop flag' : self.__stopForever.isSet()} return(stat) def system_listMethods(self): return self.__xmlrpc.list_methods() def get_server_address(self): waitCount = 0 while self.server_address == '': if waitCount == 9: break time.sleep(1) waitCount = waitCount + 1 return self.server_addressexcept ImportError: pass
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -