⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rawserver_twisted.py

📁 bittorrent source by python. please enjoy
💻 PY
📖 第 1 页 / 共 3 页
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License).  You may not copy or use this file, in either# source code or executable form, except in compliance with the License.  You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License# for the specific language governing rights and limitations under the# License.# Written by Greg Hazelimport osimport sysimport socketimport signalimport stringimport structimport threadimport loggingimport threadingimport tracebackfrom BitTorrent.translation import _from BitTorrent import BTFailurefrom BitTorrent.obsoletepythonsupport import setfrom BitTorrent.DictWithLists import DictWithListsfrom BitTorrent.defer import DeferredEvent, Deferred, run_deferred###############################################################profile = Trueprofile = Falseif profile:    try:        import hotshot        import hotshot.stats        prof_file_name = 'rawserver.prof'    except ImportError, e:        print "profiling not available:", e        profile = False##############################################################if 'twisted.internet.reactor' in sys.modules:    print ("twisted.internet.reactor was imported before BitTorrent.RawServer_twisted!\n"           "I'll clean it up for you, but don't do that!\n"           "Existing reference may be for the wrong reactor!\n"           "!")    del sys.modules['twisted.internet.reactor']from twisted.python import threadable# needed for twisted 1.3# otherwise the 'thread safety' functions are not 'thread safe'threadable.init(1)letters = set(string.letters)main_thread = thread.get_ident()noSignals = Trueiocpreactor = Falseif os.name == 'nt':    try:        from twisted.internet import iocpreactor        iocpreactor.proactor.install()        noSignals = False        iocpreactor = True    except:        # just as limited (if not more) as select, and also buggy        #try:        #    from twisted.internet import win32eventreactor        #    win32eventreactor.install()        #except:        #    pass        passelse:    try:        from twisted.internet import kqreactor        kqreactor.install()    except:        try:            from twisted.internet import pollreactor            pollreactor.install()        except:            passrawserver_logger = logging.getLogger('RawServer')#the default reactor is select-based, and will be install()ed if another has notfrom twisted.internet import reactor, task, error# as far as I know, we work with twisted 1.3 and >= 2.0#import twisted.copyright#if twisted.copyright.version.split('.') < 2:#    raise ImportError(_("RawServer_twisted requires twisted 2.0.0 or greater"))from twisted.internet.protocol import DatagramProtocol, Protocol, ClientFactoryfrom twisted.protocols.policies import TimeoutMixinfrom twisted.internet import interfaces, addressfrom BitTorrent.ConnectionRateLimitReactor import connectionRateLimitReactorNOLINGER = struct.pack('ii', 1, 0)# python sucks.SHUT_RD = getattr(socket, 'SHUT_RD', 0)SHUT_WR = getattr(socket, 'SHUT_WR', 1)# this is a base class for all the callbacks the server could useclass Handler(object):    # there is no connection_started.    # a connection request can result in either connection_failed or    # connection_made    # called when the connection is ready for writiing    def connection_made(self, s):        pass    # called when a connection request failed (failed, refused, or requested to close)    def connection_failed(self, addr, exception):        pass    def data_came_in(self, addr, data):        pass    # called once when the current write buffer empties completely    def connection_flushed(self, s):        pass    # called when a connection dies (lost or requested to close)    def connection_lost(self, s):        pass        class ConnectionWrapper(object):    def __init__(self, rawserver, handler, context, tos=0):        self.ip = None             # peer ip        self.tos = tos        self.port = None           # peer port        self.dying = False        self.paused = False        self.encrypt = None        self.connector = None        self.transport = None        self.reset_timeout = None        self.callback_connection = None        self.buffer = OutputBuffer(self._flushed)        self.post_init(rawserver, handler, context)    def post_init(self, rawserver, handler, context):        self.rawserver = rawserver        self.handler = handler        self.context = context        if self.rawserver:            self.rawserver.single_sockets.add(self)    def get_socket(self):        s = None        try:            s = self.transport.getHandle()        except:            try:                # iocpreactor doesn't implement ISystemHandle like it should                s = self.transport.socket            except:                pass        return s    def pause_reading(self):        # interfaces are the stupedist crap ever        if (hasattr(interfaces.IProducer, "providedBy") and            not interfaces.IProducer.providedBy(self.transport)):            print "No producer", self.ip, self.port, self.transport            return        # not explicitly needed, but iocpreactor has a bug where the author is a moron        if self.paused:            return        self.transport.pauseProducing()        self.paused = True    def resume_reading(self):        if (hasattr(interfaces.IProducer, "providedBy") and            not interfaces.IProducer.providedBy(self.transport)):            print "No producer", self.ip, self.port, self.transport            return        # not explicitly needed, but iocpreactor has a bug where the author is a moron        if not self.paused:            return        self.paused = False        try:            self.transport.resumeProducing()        except Exception, e:            # I bet these are harmless            print "resumeProducing error", type(e), e    def attach_transport(self, callback_connection, transport, reset_timeout):        self.transport = transport        self.callback_connection = callback_connection        self.reset_timeout = reset_timeout        if hasattr(transport, 'addBufferCallback'):            self.buffer = PassBuffer(self.transport, self._flushed, self.buffer)        elif hasattr(transport, 'registerProducer'):            # Multicast uses sendto, which does not buffer.            # It has no producer api            self.buffer.attachConsumer(self.transport)        try:            address = self.transport.getPeer()        except:            # udp, for example            address = self.transport.getHost()        try:            self.ip = address.host            self.port = address.port        except:            # unix sockets, for example            pass        if self.tos != 0:            s = self.get_socket()            try:                s.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)            except socket.error:                pass    def sendto(self, packet, flags, addr):        ret = None        try:            ret = self.transport.write(packet, addr)        except Exception, e:            rawserver_logger.warning("UDP sendto failed: %s" % unicode(e.args[0]))        return ret    def write(self, b):        if self.encrypt is not None:            b = self.encrypt(b)        # bleh        if isinstance(b, buffer):            b = str(b)        self.buffer.add(b)    def _flushed(self):        s = self        # why do you tease me so?        if s.handler is not None:            # calling flushed from the write is bad form            self.rawserver.add_task(0, s.handler.connection_flushed, s)    def is_flushed(self):        return self.buffer.is_flushed()    def shutdown(self, how):        if how == SHUT_WR:            if hasattr(self.transport, "loseWriteConnection"):                self.transport.loseWriteConnection()            else:                # twisted 1.3 sucks                try:                    self.transport.socket.shutdown(how)                except:                    pass            self.buffer.stopWriting()        elif how == SHUT_RD:            self.transport.stopListening()        else:            self.close()    def close(self):        if self.buffer:            self.buffer.stopWriting()        if self.rawserver.config['close_with_rst']:            try:                s = self.get_socket()                s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)            except:                pass        if self.transport:            if self in self.rawserver.udp_sockets:                # udp connections should only call stopListening                self.transport.stopListening()            else:                self.transport.loseConnection()        else:            if self.connector:                self.connector.disconnect()    def _cleanup(self):        if self.buffer:            self.buffer.consumer = None            del self.buffer        self.handler = None        del self.transport        if self.callback_connection:            if self.callback_connection.can_timeout:                self.callback_connection.setTimeout(None)            self.callback_connection.connection = None            del self.callback_connection# hint: not actually a bufferclass PassBuffer(object):    def __init__(self, consumer, callback_onflushed, old_buffer):        self.consumer = consumer        self.callback_onflushed = callback_onflushed        self._is_flushed = False        self.consumer.addBufferCallback(self._flushed, "buffer empty")        # swallow the data written to the old buffer

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -