⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rawserver.py

📁 BitTorrent(简称BT
💻 PY
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.0 (the License).  You may not copy or use this file, in either# source code or executable form, except in compliance with the License.  You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License# for the specific language governing rights and limitations under the# License.# Written by Bram Cohen, Uoti Urpalaimport osimport sysimport socketimport structfrom bisect import insortfrom cStringIO import StringIOfrom traceback import print_excfrom errno import EWOULDBLOCK, ENOBUFSfrom BitTorrent.platform import bttimefrom BitTorrent import WARNING, CRITICAL, FAQ_URLtry:    from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP    timemult = 1000except ImportError:    from BitTorrent.selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP    timemult = 1NOLINGER = struct.pack('ii', 1, 0)class SingleSocket(object):    def __init__(self, raw_server, sock, handler, context, ip=None):        self.raw_server = raw_server        self.socket = sock        self.handler = handler        self.buffer = []        self.last_hit = bttime()        self.fileno = sock.fileno()        self.connected = False        self.context = context        if ip is not None:            self.ip = ip        else:            try:                peername = self.socket.getpeername()            except socket.error:                self.ip = 'unknown'            else:                try:                    self.ip = peername[0]                except:                    assert isinstance(peername, basestring)                    self.ip = peername # UNIX socket, not really ip    def close(self):        sock = self.socket        self.socket = None        self.buffer = []        del self.raw_server.single_sockets[self.fileno]        self.raw_server.poll.unregister(sock)        self.handler = None        if self.raw_server.config['close_with_rst']:            sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)        sock.close()    def shutdown(self, val):        self.socket.shutdown(val)    def is_flushed(self):        return len(self.buffer) == 0    def write(self, s):        assert self.socket is not None        self.buffer.append(s)        if len(self.buffer) == 1:            self.try_write()    def try_write(self):        if self.connected:            try:                while self.buffer != []:                    amount = self.socket.send(self.buffer[0])                    if amount != len(self.buffer[0]):                        if amount != 0:                            self.buffer[0] = self.buffer[0][amount:]                        break                    del self.buffer[0]            except socket.error, e:                code, msg = e                if code != EWOULDBLOCK:                    self.raw_server.dead_from_write.append(self)                    return        if self.buffer == []:            self.raw_server.poll.register(self.socket, POLLIN)        else:            self.raw_server.poll.register(self.socket, POLLIN | POLLOUT)def default_error_handler(level, message):    print messageclass RawServer(object):    def __init__(self, doneflag, config, noisy=True,                 errorfunc=default_error_handler, tos=0):        self.config = config        self.tos = tos        self.poll = poll()        # {socket: SingleSocket}        self.single_sockets = {}        self.dead_from_write = []        self.doneflag = doneflag        self.noisy = noisy        self.errorfunc = errorfunc        self.funcs = []        self.externally_added_tasks = []        self.listening_handlers = {}        self.serversockets = {}        self.live_contexts = {None : True}        self.add_task(self.scan_for_timeouts, config['timeout_check_interval'])        if sys.platform != 'win32':            self.wakeupfds = os.pipe()            self.poll.register(self.wakeupfds[0], POLLIN)        else:            # Windows doesn't support pipes with select(). Just prevent sleeps            # longer than a second instead of proper wakeup for now.            self.wakeupfds = (None, None)            def wakeup():                self.add_task(wakeup, 1)            wakeup()    def add_context(self, context):        self.live_contexts[context] = True    def remove_context(self, context):        del self.live_contexts[context]        self.funcs = [x for x in self.funcs if x[2] != context]    def add_task(self, func, delay, context=None):        if context in self.live_contexts:            insort(self.funcs, (bttime() + delay, func, context))    def external_add_task(self, func, delay, context=None):        self.externally_added_tasks.append((func, delay, context))        # Wake up the RawServer thread in case it's sleeping in poll()        if self.wakeupfds[1] is not None:            os.write(self.wakeupfds[1], 'X')    def scan_for_timeouts(self):        self.add_task(self.scan_for_timeouts,                      self.config['timeout_check_interval'])        t = bttime() - self.config['socket_timeout']        tokill = []        for s in self.single_sockets.values():            if s.last_hit < t:                tokill.append(s)        for k in tokill:            if k.socket is not None:                self._close_socket(k)    def create_serversocket(port, bind='', reuse=False, tos=0):        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        if reuse and os.name != 'nt':            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        server.setblocking(0)        if tos != 0:            try:                server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)            except:                pass        server.bind((bind, port))        server.listen(5)        return server    create_serversocket = staticmethod(create_serversocket)    def start_listening(self, serversocket, handler, context=None):        self.listening_handlers[serversocket.fileno()] = (handler, context)        self.serversockets[serversocket.fileno()] = serversocket        self.poll.register(serversocket, POLLIN)    def stop_listening(self, serversocket):        del self.listening_handlers[serversocket.fileno()]        del self.serversockets[serversocket.fileno()]        self.poll.unregister(serversocket)    def start_connection(self, dns, handler=None, context=None, do_bind=True):        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.setblocking(0)        bindaddr = do_bind and self.config['bind']        if bindaddr:            sock.bind((bindaddr, 0))        if self.tos != 0:            try:                sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)            except:                pass        try:            sock.connect_ex(dns)        except socket.error:            sock.close()            raise        except Exception, e:            sock.close()            raise socket.error(str(e))        self.poll.register(sock, POLLIN)        s = SingleSocket(self, sock, handler, context, dns[0])        self.single_sockets[sock.fileno()] = s        return s    def wrap_socket(self, sock, handler, context=None, ip=None):        sock.setblocking(0)        self.poll.register(sock, POLLIN)        s = SingleSocket(self, sock, handler, context, ip)        self.single_sockets[sock.fileno()] = s        return s    def _handle_events(self, events):        for sock, event in events:            if sock in self.serversockets:                s = self.serversockets[sock]                if event & (POLLHUP | POLLERR) != 0:                    self.poll.unregister(s)                    s.close()                    self.errorfunc(CRITICAL, 'lost server socket')                else:                    handler, context = self.listening_handlers[sock]                    try:                        newsock, addr = s.accept()                    except socket.error:                        continue                    try:                        newsock.setblocking(0)                        nss = SingleSocket(self, newsock, handler, context)                        self.single_sockets[newsock.fileno()] = nss                        self.poll.register(newsock, POLLIN)                        self._make_wrapped_call(handler. \                           external_connection_made, (nss,), context=context)                    except socket.error, e:                        self.errorfunc(WARNING, "Error handling accepted "                                       "connection: "+str(e))            else:                s = self.single_sockets.get(sock)                if s is None:                    if sock == self.wakeupfds[0]:                        # Another thread wrote this just to wake us up.                        os.read(sock, 1)                    continue                s.connected = True                if event & POLLERR:                    self._close_socket(s)                    continue                if event & (POLLIN | POLLHUP):                    s.last_hit = bttime()                    try:                        data = s.socket.recv(100000)                    except socket.error, e:                        code, msg = e                        if code != EWOULDBLOCK:                            self._close_socket(s)                        continue                    if data == '':                        self._close_socket(s)                    else:                        self._make_wrapped_call(s.handler.data_came_in,                                                (s, data), s)                # data_came_in could have closed the socket (s.socket = None)                if event & POLLOUT and s.socket is not None:                    s.try_write()                    if s.is_flushed():                        self._make_wrapped_call(s.handler.connection_flushed,                                                (s,), s)    def _pop_externally_added(self):        while self.externally_added_tasks:            task = self.externally_added_tasks.pop(0)            self.add_task(*task)    def listen_forever(self):        while not self.doneflag.isSet():            try:                self._pop_externally_added()                if not self.funcs:                    period = 1e9                else:                    period = self.funcs[0][0] - bttime()                if period < 0:                    period = 0                events = self.poll.poll(period * timemult)                if self.doneflag.isSet():                    return                while self.funcs and self.funcs[0][0] <= bttime():                    garbage, func, context = self.funcs.pop(0)                    self._make_wrapped_call(func, (), context=context)                self._close_dead()                self._handle_events(events)                if self.doneflag.isSet():                    return                self._close_dead()            except error, e:                if self.doneflag.isSet():                    return                # I can't find a coherent explanation for what the behavior                # should be here, and people report conflicting behavior,                # so I'll just try all the possibilities                try:                    code, msg, desc = e                except:                    try:                        code, msg = e                    except:                        code = ENOBUFS                if code == ENOBUFS:                    self.errorfunc(CRITICAL, "Have to exit due to the TCP "                                   "stack flaking out. "                                   "Please see the FAQ at %s"%FAQ_URL)                    return            except KeyboardInterrupt:                print_exc()                return            except:                data = StringIO()                print_exc(file=data)                self.errorfunc(CRITICAL, data.getvalue())    def _make_wrapped_call(self, function, args, socket=None, context=None):        try:            function(*args)        except KeyboardInterrupt:            raise        except Exception, e:         # hopefully nothing raises strings            # Incoming sockets can be assigned to a particular torrent during            # a data_came_in call, and it's possible (though not likely) that            # there could be a torrent-specific exception during the same call.            # Therefore read the context after the call.            if socket is not None:                context = socket.context            if self.noisy and context is None:                data = StringIO()                print_exc(file=data)                self.errorfunc(CRITICAL, data.getvalue())            if context is not None:                context.got_exception(e)    def _close_dead(self):        while len(self.dead_from_write) > 0:            old = self.dead_from_write            self.dead_from_write = []            for s in old:                if s.socket is not None:                    self._close_socket(s)    def _close_socket(self, s):        sock = s.socket.fileno()        if self.config['close_with_rst']:            s.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)        s.socket.close()        self.poll.unregister(sock)        del self.single_sockets[sock]        s.socket = None        self._make_wrapped_call(s.handler.connection_lost, (s,), s)        s.handler = None

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -