📄 rawserver_twisted.py
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License). You may not copy or use this file, in either# source code or executable form, except in compliance with the License. You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License# for the specific language governing rights and limitations under the# License.# Written by Greg Hazelimport osimport sysimport socketimport signalimport stringimport structimport threadimport loggingimport threadingimport tracebackfrom BitTorrent.translation import _from BitTorrent import BTFailurefrom BitTorrent.obsoletepythonsupport import setfrom BitTorrent.DictWithLists import DictWithListsfrom BitTorrent.defer import DeferredEvent, Deferred, run_deferred###############################################################profile = Trueprofile = Falseif profile: try: import hotshot import hotshot.stats prof_file_name = 'rawserver.prof' except ImportError, e: print "profiling not available:", e profile = False##############################################################if 'twisted.internet.reactor' in sys.modules: print ("twisted.internet.reactor was imported before BitTorrent.RawServer_twisted!\n" "I'll clean it up for you, but don't do that!\n" "Existing reference may be for the wrong reactor!\n" "!") del sys.modules['twisted.internet.reactor']from twisted.python import threadable# needed for twisted 1.3# otherwise the 'thread safety' functions are not 'thread safe'threadable.init(1)letters = set(string.letters)main_thread = thread.get_ident()noSignals = Trueiocpreactor = Falseif os.name == 'nt': try: from twisted.internet import iocpreactor iocpreactor.proactor.install() noSignals = False iocpreactor = True except: # just as limited (if not more) as select, and also buggy #try: # from twisted.internet import win32eventreactor # win32eventreactor.install() #except: # pass passelse: try: from twisted.internet import kqreactor kqreactor.install() except: try: from twisted.internet import pollreactor pollreactor.install() except: passrawserver_logger = logging.getLogger('RawServer')#the default reactor is select-based, and will be install()ed if another has notfrom twisted.internet import reactor, task, error# as far as I know, we work with twisted 1.3 and >= 2.0#import twisted.copyright#if twisted.copyright.version.split('.') < 2:# raise ImportError(_("RawServer_twisted requires twisted 2.0.0 or greater"))from twisted.internet.protocol import DatagramProtocol, Protocol, ClientFactoryfrom twisted.protocols.policies import TimeoutMixinfrom twisted.internet import interfaces, addressfrom BitTorrent.ConnectionRateLimitReactor import connectionRateLimitReactorNOLINGER = struct.pack('ii', 1, 0)# python sucks.SHUT_RD = getattr(socket, 'SHUT_RD', 0)SHUT_WR = getattr(socket, 'SHUT_WR', 1)# this is a base class for all the callbacks the server could useclass Handler(object): # there is no connection_started. # a connection request can result in either connection_failed or # connection_made # called when the connection is ready for writiing def connection_made(self, s): pass # called when a connection request failed (failed, refused, or requested to close) def connection_failed(self, addr, exception): pass def data_came_in(self, addr, data): pass # called once when the current write buffer empties completely def connection_flushed(self, s): pass # called when a connection dies (lost or requested to close) def connection_lost(self, s): pass class ConnectionWrapper(object): def __init__(self, rawserver, handler, context, tos=0): self.ip = None # peer ip self.tos = tos self.port = None # peer port self.dying = False self.paused = False self.encrypt = None self.connector = None self.transport = None self.reset_timeout = None self.callback_connection = None self.buffer = OutputBuffer(self._flushed) self.post_init(rawserver, handler, context) def post_init(self, rawserver, handler, context): self.rawserver = rawserver self.handler = handler self.context = context if self.rawserver: self.rawserver.single_sockets.add(self) def get_socket(self): s = None try: s = self.transport.getHandle() except: try: # iocpreactor doesn't implement ISystemHandle like it should s = self.transport.socket except: pass return s def pause_reading(self): # interfaces are the stupedist crap ever if (hasattr(interfaces.IProducer, "providedBy") and not interfaces.IProducer.providedBy(self.transport)): print "No producer", self.ip, self.port, self.transport return # not explicitly needed, but iocpreactor has a bug where the author is a moron if self.paused: return self.transport.pauseProducing() self.paused = True def resume_reading(self): if (hasattr(interfaces.IProducer, "providedBy") and not interfaces.IProducer.providedBy(self.transport)): print "No producer", self.ip, self.port, self.transport return # not explicitly needed, but iocpreactor has a bug where the author is a moron if not self.paused: return self.paused = False try: self.transport.resumeProducing() except Exception, e: # I bet these are harmless print "resumeProducing error", type(e), e def attach_transport(self, callback_connection, transport, reset_timeout): self.transport = transport self.callback_connection = callback_connection self.reset_timeout = reset_timeout if hasattr(transport, 'addBufferCallback'): self.buffer = PassBuffer(self.transport, self._flushed, self.buffer) elif hasattr(transport, 'registerProducer'): # Multicast uses sendto, which does not buffer. # It has no producer api self.buffer.attachConsumer(self.transport) try: address = self.transport.getPeer() except: # udp, for example address = self.transport.getHost() try: self.ip = address.host self.port = address.port except: # unix sockets, for example pass if self.tos != 0: s = self.get_socket() try: s.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos) except socket.error: pass def sendto(self, packet, flags, addr): ret = None try: ret = self.transport.write(packet, addr) except Exception, e: rawserver_logger.warning("UDP sendto failed: %s" % unicode(e.args[0])) return ret def write(self, b): if self.encrypt is not None: b = self.encrypt(b) # bleh if isinstance(b, buffer): b = str(b) self.buffer.add(b) def _flushed(self): s = self # why do you tease me so? if s.handler is not None: # calling flushed from the write is bad form self.rawserver.add_task(0, s.handler.connection_flushed, s) def is_flushed(self): return self.buffer.is_flushed() def shutdown(self, how): if how == SHUT_WR: if hasattr(self.transport, "loseWriteConnection"): self.transport.loseWriteConnection() else: # twisted 1.3 sucks try: self.transport.socket.shutdown(how) except: pass self.buffer.stopWriting() elif how == SHUT_RD: self.transport.stopListening() else: self.close() def close(self): if self.buffer: self.buffer.stopWriting() if self.rawserver.config['close_with_rst']: try: s = self.get_socket() s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER) except: pass if self.transport: if self in self.rawserver.udp_sockets: # udp connections should only call stopListening self.transport.stopListening() else: self.transport.loseConnection() else: if self.connector: self.connector.disconnect() def _cleanup(self): if self.buffer: self.buffer.consumer = None del self.buffer self.handler = None del self.transport if self.callback_connection: if self.callback_connection.can_timeout: self.callback_connection.setTimeout(None) self.callback_connection.connection = None del self.callback_connection# hint: not actually a bufferclass PassBuffer(object): def __init__(self, consumer, callback_onflushed, old_buffer): self.consumer = consumer self.callback_onflushed = callback_onflushed self._is_flushed = False self.consumer.addBufferCallback(self._flushed, "buffer empty") # swallow the data written to the old buffer
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -