📄 rawserver_twisted.py
字号:
if old_buffer._buffer_list: self.consumer.writeSequence(old_buffer._buffer_list) old_buffer._buffer_list[:] = [] def add(self, b): self._is_flushed = False self.consumer.write(b) def stopWriting(self): pass def is_flushed(self): return self._is_flushed def _flushed(self): self._is_flushed = True self.callback_onflushed() class OutputBuffer(object): # This is an IPullProducer which has an unlimited buffer size, # and calls a callback when the buffer is completely flushed def __init__(self, callback_onflushed): self.consumer = None self.registered = False self.callback_onflushed = callback_onflushed self._buffer_list = [] def is_flushed(self): return (len(self._buffer_list) == 0) def attachConsumer(self, consumer): self.consumer = consumer if not self.registered: self.beginWriting() def add(self, b): self._buffer_list.append(b) if self.consumer and not self.registered: self.beginWriting() def beginWriting(self): self.stopWriting() assert self.consumer, "You must attachConsumer before you beginWriting" self.registered = True self.consumer.registerProducer(self, False) def stopWriting(self): if not self.registered: return self.registered = False if self.consumer: try: self.consumer.unregisterProducer() except KeyError: # bug in iocpreactor: http://twistedmatrix.com/trac/ticket/1657 pass def resumeProducing(self): if not self.registered: return assert self.consumer, "You must attachConsumer before you resumeProducing" to_write = len(self._buffer_list) if to_write > 0: self.consumer.writeSequence(self._buffer_list) del self._buffer_list[:] self.callback_onflushed() else: self.stopWriting() def pauseProducing(self): pass def stopProducing(self): passclass CallbackConnection(object): def attachTransport(self, transport, s): s.attach_transport(self, transport=transport, reset_timeout=self.optionalResetTimeout) self.connection = s def connectionMade(self): s = self.connection s.handler.connection_made(s) self.optionalResetTimeout() self.factory.rawserver.connectionMade(s) def connectionLost(self, reason): reactor.callLater(0, self.post_connectionLost, reason) # twisted api inconsistancy workaround # sometimes connectionLost is called (not queued) from inside write() def post_connectionLost(self, reason): s = self.connection #print s.ip, s.port, reason.getErrorMessage() self.factory.rawserver._remove_socket(self.connection, was_connected=True) def dataReceived(self, data): self.optionalResetTimeout() s = self.connection s.rawserver._make_wrapped_call(s.handler.data_came_in, s, data, wrapper=s) def datagramReceived(self, data, (host, port)): s = self.connection s.rawserver._make_wrapped_call(s.handler.data_came_in, (host, port), data, wrapper=s) def optionalResetTimeout(self): if self.can_timeout: self.resetTimeout()class CallbackProtocol(CallbackConnection, TimeoutMixin, Protocol): def makeConnection(self, transport): self.can_timeout = True self.setTimeout(self.factory.rawserver.config['socket_timeout']) outgoing = False key = None host = transport.getHost() if isinstance(host, address.UNIXAddress): key = host.name else: key = host.port try: addr = (transport.getPeer().host, transport.getPeer().port) c = self.factory.pop_connection_data(addr) outgoing = True except KeyError: # happens when this was an incoming connection pass except AttributeError: # happens when the peer is a unix socket pass if not outgoing: args = self.factory.get_connection_data(key) c = ConnectionWrapper(*args) self.attachTransport(transport, c) Protocol.makeConnection(self, transport)class CallbackDatagramProtocol(CallbackConnection, DatagramProtocol): def startProtocol(self): self.can_timeout = False self.attachTransport(self.transport, self.connection) DatagramProtocol.startProtocol(self)class ConnectionFactory(ClientFactory): def __init__(self): self.connection_data = DictWithLists() def add_connection_data(self, key, data): self.connection_data.push_to_row(key, data) def get_connection_data(self, key): return self.connection_data.get_from_row(key) def pop_connection_data(self, key): return self.connection_data.pop_from_row(key) def clientConnectionFailed(self, connector, reason): peer = connector.getDestination() addr = (peer.host, peer.port) s = self.pop_connection_data(addr) # opt-out if not s.dying: # this might not work - reason is not an exception s.handler.connection_failed(addr, reason) s.dying = True self.rawserver._remove_socket(s)# storage for socket creation requestions, and proxy once the connection is madeclass SocketRequestProxy(object): def __init__(self, port, bind, tos, protocol): self.port = port self.bind = bind self.tos = tos self.protocol = protocol self.connection = None def __getattr__(self, name): try: return getattr(self.connection, name) except: raise AttributeError, name def close(self): # closing the proxy doesn't mean anything. # you can stop_listening(), and then start again. # the socket only exists while it is listening if self.connection: self.connection.close()class RawServerMixin(object): def __init__(self, config=None, noisy=True, tos=0): self.noisy = noisy self.config = config if not self.config: self.config = {} self.tos = tos self.sigint_flag = None self.sigint_installed = False # going away soon. call _context_wrap on the context. def _make_wrapped_call(self, _f, *args, **kwargs): wrapper = kwargs.pop('wrapper', None) try: _f(*args, **kwargs) except KeyboardInterrupt: raise except Exception, e: # hopefully nothing raises strings # Incoming sockets can be assigned to a particular torrent during # a data_came_in call, and it's possible (though not likely) that # there could be a torrent-specific exception during the same call. # Therefore read the context after the call. context = None if wrapper is not None: context = wrapper.context if context is not None: context.got_exception(*sys.exc_info()) elif self.noisy: rawserver_logger.exception("Error in _make_wrapped_call for %s", _f.__name__) # must be called from the main thread def install_sigint_handler(self, flag = None): if flag is not None: self.sigint_flag = flag signal.signal(signal.SIGINT, self._handler) self.sigint_installed = True def _handler(self, signum, frame): if self.sigint_flag: self.external_add_task(0, self.sigint_flag.set) elif self.doneflag: self.external_add_task(0, self.doneflag.set) # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt, # in case the program is in an infinite loop signal.signal(signal.SIGINT, signal.default_int_handler)class RawServer(RawServerMixin): """RawServer encapsulates I/O and task scheduling. I/O corresponds to the arrival data on a file descriptor, and a task is a scheduled callback. A task is scheduled using add_task or external_add_task. add_task is used from within the thread running the RawServer, external_add_task from other threads. tracker.py provides a simple example of how to use RawServer. 1. creates an instance of RawServer r = RawServer(config) 2. creates a socket by a call to create_serversocket. s = r.create_serversocket(config['port'], config['bind'], True) 3. tells the raw server to listen to the socket and associate a protocol handler with the socket. r.start_listening(s, HTTPHandler(t.get, config['min_time_between_log_flushes'])) 4. tells the raw_server to listen for I/O or scheduled tasks until stop is called. r.listen_forever() When a remote client opens a connection, a new socket is returned from the server socket's accept method and the socket is assigned the same handler as was assigned to the server socket. As data arrives on a socket, the handler's data_came_in member function is called. It is up to the handler to interpret the data and/or pass it on to other objects. In the tracker, the HTTP protocol handler passes the arriving data to an HTTPConnection object which maintains state specific to a given connection. For outgoing connections, the call start_connection() is used. """ def __init__(self, config=None, noisy=True, tos=0): """config is a dict that contains option-value pairs. 'tos' is passed to setsockopt to set the IP Type of Service (i.e., DiffServ byte), in IPv4 headers.""" RawServerMixin.__init__(self, config, noisy, tos) self.doneflag = None # init is fine until the loop starts self.ident = thread.get_ident()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -