⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rawserver_twisted.py

📁 bittorrent source by python. please enjoy
💻 PY
📖 第 1 页 / 共 3 页
字号:
        if old_buffer._buffer_list:            self.consumer.writeSequence(old_buffer._buffer_list)            old_buffer._buffer_list[:] = []    def add(self, b):        self._is_flushed = False        self.consumer.write(b)    def stopWriting(self):        pass        def is_flushed(self):        return self._is_flushed        def _flushed(self):        self._is_flushed = True        self.callback_onflushed()        class OutputBuffer(object):    # This is an IPullProducer which has an unlimited buffer size,    # and calls a callback when the buffer is completely flushed    def __init__(self, callback_onflushed):        self.consumer = None        self.registered = False        self.callback_onflushed = callback_onflushed        self._buffer_list = []    def is_flushed(self):        return (len(self._buffer_list) == 0)    def attachConsumer(self, consumer):        self.consumer = consumer        if not self.registered:            self.beginWriting()                def add(self, b):        self._buffer_list.append(b)        if self.consumer and not self.registered:            self.beginWriting()    def beginWriting(self):        self.stopWriting()        assert self.consumer, "You must attachConsumer before you beginWriting"        self.registered = True        self.consumer.registerProducer(self, False)    def stopWriting(self):        if not self.registered:            return        self.registered = False        if self.consumer:            try:                self.consumer.unregisterProducer()            except KeyError:                # bug in iocpreactor: http://twistedmatrix.com/trac/ticket/1657                pass    def resumeProducing(self):        if not self.registered:            return        assert self.consumer, "You must attachConsumer before you resumeProducing"        to_write = len(self._buffer_list)        if to_write > 0:            self.consumer.writeSequence(self._buffer_list)            del self._buffer_list[:]            self.callback_onflushed()        else:            self.stopWriting()    def pauseProducing(self):        pass    def stopProducing(self):        passclass CallbackConnection(object):    def attachTransport(self, transport, s):        s.attach_transport(self, transport=transport, reset_timeout=self.optionalResetTimeout)        self.connection = s    def connectionMade(self):        s = self.connection        s.handler.connection_made(s)        self.optionalResetTimeout()        self.factory.rawserver.connectionMade(s)    def connectionLost(self, reason):        reactor.callLater(0, self.post_connectionLost, reason)    # twisted api inconsistancy workaround    # sometimes connectionLost is called (not queued) from inside write()    def post_connectionLost(self, reason):        s = self.connection        #print s.ip, s.port, reason.getErrorMessage()        self.factory.rawserver._remove_socket(self.connection, was_connected=True)    def dataReceived(self, data):        self.optionalResetTimeout()        s = self.connection        s.rawserver._make_wrapped_call(s.handler.data_came_in,                                       s, data, wrapper=s)    def datagramReceived(self, data, (host, port)):        s = self.connection        s.rawserver._make_wrapped_call(s.handler.data_came_in,                                       (host, port), data, wrapper=s)    def optionalResetTimeout(self):        if self.can_timeout:            self.resetTimeout()class CallbackProtocol(CallbackConnection, TimeoutMixin, Protocol):    def makeConnection(self, transport):        self.can_timeout = True        self.setTimeout(self.factory.rawserver.config['socket_timeout'])        outgoing = False        key = None        host = transport.getHost()        if isinstance(host, address.UNIXAddress):            key = host.name        else:            key = host.port        try:            addr = (transport.getPeer().host, transport.getPeer().port)            c = self.factory.pop_connection_data(addr)            outgoing = True        except KeyError:            # happens when this was an incoming connection            pass        except AttributeError:            # happens when the peer is a unix socket            pass        if not outgoing:            args = self.factory.get_connection_data(key)            c = ConnectionWrapper(*args)        self.attachTransport(transport, c)        Protocol.makeConnection(self, transport)class CallbackDatagramProtocol(CallbackConnection, DatagramProtocol):    def startProtocol(self):        self.can_timeout = False        self.attachTransport(self.transport, self.connection)        DatagramProtocol.startProtocol(self)class ConnectionFactory(ClientFactory):    def __init__(self):        self.connection_data = DictWithLists()    def add_connection_data(self, key, data):        self.connection_data.push_to_row(key, data)    def get_connection_data(self, key):        return self.connection_data.get_from_row(key)    def pop_connection_data(self, key):        return self.connection_data.pop_from_row(key)    def clientConnectionFailed(self, connector, reason):        peer = connector.getDestination()        addr = (peer.host, peer.port)        s = self.pop_connection_data(addr)        # opt-out        if not s.dying:            # this might not work - reason is not an exception            s.handler.connection_failed(addr, reason)            s.dying = True        self.rawserver._remove_socket(s)# storage for socket creation requestions, and proxy once the connection is madeclass SocketRequestProxy(object):    def __init__(self, port, bind, tos, protocol):        self.port = port        self.bind = bind        self.tos = tos        self.protocol = protocol        self.connection = None    def __getattr__(self, name):        try:            return getattr(self.connection, name)        except:            raise AttributeError, name    def close(self):        # closing the proxy doesn't mean anything.        # you can stop_listening(), and then start again.        # the socket only exists while it is listening        if self.connection:            self.connection.close()class RawServerMixin(object):    def __init__(self, config=None, noisy=True, tos=0):        self.noisy = noisy        self.config = config        if not self.config:            self.config = {}        self.tos = tos        self.sigint_flag = None        self.sigint_installed = False            # going away soon. call _context_wrap on the context.    def _make_wrapped_call(self, _f, *args, **kwargs):        wrapper = kwargs.pop('wrapper', None)        try:            _f(*args, **kwargs)        except KeyboardInterrupt:            raise        except Exception, e:         # hopefully nothing raises strings            # Incoming sockets can be assigned to a particular torrent during            # a data_came_in call, and it's possible (though not likely) that            # there could be a torrent-specific exception during the same call.            # Therefore read the context after the call.            context = None            if wrapper is not None:                context = wrapper.context            if context is not None:                context.got_exception(*sys.exc_info())            elif self.noisy:                rawserver_logger.exception("Error in _make_wrapped_call for %s",                                           _f.__name__)       # must be called from the main thread    def install_sigint_handler(self, flag = None):        if flag is not None:            self.sigint_flag = flag        signal.signal(signal.SIGINT, self._handler)        self.sigint_installed = True    def _handler(self, signum, frame):        if self.sigint_flag:            self.external_add_task(0, self.sigint_flag.set)        elif self.doneflag:            self.external_add_task(0, self.doneflag.set)               # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt,        # in case the program is in an infinite loop        signal.signal(signal.SIGINT, signal.default_int_handler)class RawServer(RawServerMixin):    """RawServer encapsulates I/O and task scheduling.       I/O corresponds to the arrival data on a file descriptor,       and a task is a scheduled callback.  A task is scheduled       using add_task or external_add_task.  add_task is used from within the       thread running the RawServer, external_add_task from other threads.       tracker.py provides a simple example of how to use RawServer.        1. creates an instance of RawServer            r = RawServer(config)        2. creates a socket by a call to create_serversocket.            s = r.create_serversocket(config['port'], config['bind'], True)        3. tells the raw server to listen to the socket and associate           a protocol handler with the socket.            r.start_listening(s,                HTTPHandler(t.get, config['min_time_between_log_flushes']))        4. tells the raw_server to listen for I/O or scheduled tasks           until stop is called.            r.listen_forever()           When a remote client opens a connection, a new socket is           returned from the server socket's accept method and the           socket is assigned the same handler as was assigned to the           server socket.           As data arrives on a socket, the handler's data_came_in           member function is called.  It is up to the handler to           interpret the data and/or pass it on to other objects.           In the tracker, the HTTP protocol handler passes the arriving data           to an HTTPConnection object which maintains state specific           to a given connection.         For outgoing connections, the call start_connection() is used.         """    def __init__(self, config=None, noisy=True, tos=0):        """config is a dict that contains option-value pairs.           'tos' is passed to setsockopt to set the IP Type of Service (i.e.,           DiffServ byte), in IPv4 headers."""        RawServerMixin.__init__(self, config, noisy, tos)        self.doneflag = None        # init is fine until the loop starts        self.ident = thread.get_ident()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -