📄 rawserver_twisted.py
字号:
self.associated = False self.single_sockets = set() self.udp_sockets = set() self.listened = False self.connections = 0 ############################################################## if profile: try: os.unlink(prof_file_name) except: pass self.prof = hotshot.Profile(prof_file_name) callLater = reactor.callLater def _profile_call(delay, _f, *a, **kw): return callLater(delay, self.prof.runcall, _f, *a, **kw) reactor.callLater = _profile_call o = reactor.callFromThread def _profile_call2(_f, *a, **kw): return o(self.prof.runcall, _f, *a, **kw) reactor.callFromThread = _profile_call2 ############################################################## connectionRateLimitReactor(reactor, self.config.get('max_incomplete', 10)) # bleh self.add_pending_connection = reactor.add_pending_connection self.remove_pending_connection = reactor.remove_pending_connection self.reactor = reactor self.factory = ConnectionFactory() self.factory.rawserver = self self.factory.protocol = CallbackProtocol #l2 = task.LoopingCall(self._print_connection_count) #l2.start(1) ############################################################## def _print_connection_count(self): def _sl(x): if hasattr(x, "__len__"): return str(len(x)) else: return str(x) c = len(self.single_sockets) u = len(self.udp_sockets) c -= u #s = "Connections(" + str(id(self)) + "): tcp(" + str(c) + ") upd(" + str(u) + ")" #rawserver_logger.debug(s) d = dict() for s in self.single_sockets: state = "None" if not s.dying and s.transport: try: state = s.transport.state except: state = "has transport" else: state = "No transport" if state not in d: d[state] = 0 d[state] += 1 #rawserver_logger.debug(d) print d sizes = "cc(" + _sl(self.connections) sizes += ") ss(" + _sl(self.single_sockets) sizes += ") us(" + _sl(self.udp_sockets) + ")" #rawserver_logger.debug(sizes) print sizes ############################################################## def get_remote_endpoints(self): addrs = [(s.ip, s.port) for s in self.single_sockets] return addrs## def add_task(self, delay, _f, *args, **kwargs):## """Schedule the passed function 'func' to be called after## 'delay' seconds and pass the 'args'.#### This should only be called by RawServer's thread."""## #assert thread.get_ident() == self.ident## return reactor.callLater(delay, _f, *args, **kwargs) add_task = reactor.callLater def external_add_task(self, delay, _f, *args, **kwargs): """Schedule the passed function 'func' to be called after 'delay' seconds and pass 'args'. This should be called by threads other than RawServer's thread.""" if delay == 0: return reactor.callFromThread(_f, *args, **kwargs) else: return reactor.callFromThread(reactor.callLater, delay, _f, *args, **kwargs) def create_unixserversocket(self, filename): s = SocketRequestProxy(0, filename, 0, 'unix') s.listening_port = reactor.listenUNIX(s.bind, self.factory) s.listening_port.listening = True return s def create_serversocket(self, port, bind='', tos=0): s = SocketRequestProxy(port, bind, tos, 'tcp') try: s.listening_port = reactor.listenTCP(s.port, self.factory, interface=s.bind) except error.CannotListenError, e: if e[0] != 0: raise e.socketError else: raise s.listening_port.listening = True return s def _create_udpsocket(self, port, bind, tos, create_func): s = SocketRequestProxy(port, bind, tos, 'udp') protocol = CallbackDatagramProtocol() c = ConnectionWrapper(None, None, None, tos) s.connection = c protocol.connection = c try: s.listening_port = create_func(s.port, protocol, interface=s.bind) except error.CannotListenError, e: raise e.socketError s.listening_port.listening = True return s def create_udpsocket(self, port, bind='', tos=0): return self._create_udpsocket(port, bind, tos, create_func = reactor.listenUDP) def create_multicastsocket(self, port, bind='', tos=0): return self._create_udpsocket(port, bind, tos, create_func = reactor.listenMulticast) def _start_listening(self, s): if not s.listening_port.listening: s.listening_port.startListening() s.listening_port.listening = True def _get_data_key(self, serversocket): if serversocket.protocol == 'tcp': key = serversocket.port elif serversocket.protocol == 'unix': key = serversocket.bind else: raise TypeError("Unknown protocol: " + str(serversocket.protocol)) return key def start_listening(self, serversocket, handler, context=None): self.factory.add_connection_data(self._get_data_key(serversocket), (self, handler, context, serversocket.tos)) self._start_listening(serversocket) def start_listening_udp(self, serversocket, handler, context=None): c = serversocket.connection c.post_init(self, handler, context) self._start_listening(serversocket) self.udp_sockets.add(c) start_listening_multicast = start_listening_udp def stop_listening(self, serversocket): listening_port = serversocket.listening_port try: listening_port.stopListening() except AttributeError: # AttributeError: 'MulticastPort' object has no attribute 'handle_disconnected_stopListening' # sigh. pass listening_port.listening = False if serversocket.protocol != 'udp': self.factory.pop_connection_data(self._get_data_key(serversocket)) def stop_listening_udp(self, serversocket): self.stop_listening(serversocket) self.udp_sockets.remove(serversocket.connection) self.single_sockets.remove(serversocket.connection) stop_listening_multicast = stop_listening_udp def start_connection(self, dns, handler, context=None, do_bind=True): addr = dns[0] port = int(dns[1]) if len(letters.intersection(addr)) > 0: rawserver_logger.warning("Don't pass host names to RawServer") # this blocks, that's why we throw the warning addr = socket.gethostbyname(addr) bindaddr = None if do_bind: bindaddr = self.config['bind'] if isinstance(bindaddr, str) and len(bindaddr) >= 0: bindaddr = (bindaddr, 0) else: bindaddr = None c = ConnectionWrapper(self, handler, context, self.tos) self.factory.add_connection_data((addr, port), c) connector = reactor.connectTCP(addr, port, self.factory, bindAddress=bindaddr) c.connector = connector self.single_sockets.add(c) return c def associate_thread(self): assert not self.associated, \ "RawServer has already been associated with a thread" self.ident = thread.get_ident() reactor.ident = self.ident self.associated = True def listen_forever(self, doneflag=None): """Main event processing loop for RawServer. RawServer listens until the doneFlag is set by some other thread. The doneFlag tells all threads to clean-up and then exit.""" if not doneflag: doneflag = DeferredEvent() assert isinstance(doneflag, DeferredEvent) self.doneflag = doneflag if not self.associated: self.associate_thread() if self.listened: Exception(_("listen_forever() should only be called once per reactor.")) if main_thread == thread.get_ident() and not self.sigint_installed: self.install_sigint_handler() if iocpreactor and main_thread == thread.get_ident(): def pulse(): self.add_task(1, pulse) pulse() reactor.callLater(0, self.doneflag.addCallback, self._safestop) self.listened = True reactor.suggestThreadPoolSize(3) if noSignals: reactor.run(installSignalHandlers=False) else: reactor.run() if profile: self.prof.close() stats = hotshot.stats.load(prof_file_name) stats.strip_dirs() stats.sort_stats('time', 'calls') print "Rawserver MainLoop Profile:" stats.print_stats(20) def listen_once(self, period=1e9): rawserver_logger.warning(_("listen_once() might not return until there is activity, and might not process the event you want. Use listen_forever().")) reactor.iterate(period) def stop(self): if self.doneflag and not self.doneflag.isSet(): self.doneflag.set() def _safestop(self, r=None): if not threadable.isInIOThread(): self.external_add_task(0, self._stop) else: self._stop() def _stop(self, r=None): assert thread.get_ident() == self.ident connections = list(self.single_sockets) for connection in connections: try: connection.close() except: pass reactor.suggestThreadPoolSize(0) try: reactor.stop() except RuntimeError: # exceptions.RuntimeError: can't stop reactor that isn't running pass def _remove_socket(self, s, was_connected=False): # opt-out if not s.dying: self._make_wrapped_call(s.handler.connection_lost, s, wrapper=s) s._cleanup() self.single_sockets.remove(s) if was_connected: self.connections -= 1 def connectionMade(self, s): self.connections += 1
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -