⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rawserver.py

📁 BitTorrentABC-Linux-V.2.4.3源码
💻 PY
📖 第 1 页 / 共 2 页
字号:
# Written by Bram Cohen# see LICENSE.txt for license informationfrom bisect import insortimport socketfrom cStringIO import StringIOfrom traceback import print_excfrom errno import EWOULDBLOCK, ECONNREFUSED, EHOSTUNREACHtry:    from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP    timemult = 1000except ImportError:    from selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP    timemult = 1from threading import Thread, Eventfrom time import time, sleepimport sysfrom random import randrangetrue = 1false = 0all = POLLIN | POLLOUTclass SingleSocket:    def __init__(self, raw_server, sock, handler, ip = None):        self.raw_server = raw_server        self.socket = sock        self.handler = handler        self.buffer = []        self.last_hit = time()        self.fileno = sock.fileno()        self.connected = false        try:            self.ip = self.socket.getpeername()[0]        except:            if ip is None:                self.ip = 'unknown'            else:                self.ip = ip            def get_ip(self):        return self.ip            def close(self):        sock = self.socket        self.socket = None        self.buffer = []        del self.raw_server.single_sockets[self.fileno]        self.raw_server.poll.unregister(sock)        sock.close()    def shutdown(self, val):        self.socket.shutdown(val)    def is_flushed(self):        return len(self.buffer) == 0    def write(self, s):        assert self.socket is not None        self.buffer.append(s)        if len(self.buffer) == 1:            self.try_write()    def try_write(self):        if self.connected:            try:                while self.buffer != []:                    amount = self.socket.send(self.buffer[0])                    if amount != len(self.buffer[0]):                        if amount != 0:                            self.buffer[0] = self.buffer[0][amount:]                        break                    del self.buffer[0]            except socket.error, e:                try:                    code, msg = e                except:                    raise TypeError, "cannot decode "+str(e)                if code != EWOULDBLOCK:                    self.raw_server.dead_from_write.append(self)                    return        if self.buffer == []:            self.raw_server.poll.register(self.socket, POLLIN)        else:            self.raw_server.poll.register(self.socket, all)class RawServer:    def __init__(self, doneflag, timeout_check_interval, timeout,                 noisy = true, errorfunc = None):        self.timeout_check_interval = timeout_check_interval        self.timeout = timeout        self.poll = poll()        # {socket: SingleSocket}        self.single_sockets = {}        self.dead_from_write = []        self.doneflag = doneflag        self.noisy = noisy        self.errorfunc = errorfunc        self.exccount = 0        self.funcs = []        self.externally_added = []        self.add_task(self.scan_for_timeouts, timeout_check_interval)    def add_task(self, func, delay):        insort(self.funcs, (time() + delay, func))    def external_add_task(self, func, delay = 0):        self.externally_added.append((func, delay))    def scan_for_timeouts(self):        self.add_task(self.scan_for_timeouts, self.timeout_check_interval)        t = time() - self.timeout        tokill = []        for s in self.single_sockets.values():            if s.last_hit < t:                tokill.append(s)        for k in tokill:            if k.socket is not None:                self._close_socket(k)    def bind(self, port, bind = '', reuse = false):        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        if reuse:            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        server.setblocking(0)        server.bind((bind, port))        server.listen(5)        self.poll.register(server, POLLIN)        self.server = server    def start_connection(self, dns, handler = None):        if handler is None:            handler = self.handler        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.setblocking(0)        try:            sock.connect_ex(dns)        except socket.error:            raise        except Exception, e:            raise socket.error(str(e))        self.poll.register(sock, POLLIN)        s = SingleSocket(self, sock, handler, dns[0])        self.single_sockets[sock.fileno()] = s        return s            def handle_events(self, events):        for sock, event in events:            if sock == self.server.fileno():                if event & (POLLHUP | POLLERR) != 0:                    self.poll.unregister(self.server)                    self.server.close()                    print "lost server socket"                else:                    try:                        newsock, addr = self.server.accept()                        newsock.setblocking(0)
                        nss = SingleSocket(self, newsock, self.handler)
                        self.single_sockets[newsock.fileno()] = nss
                        self.poll.register(newsock, POLLIN)
                        self.handler.external_connection_made(nss)
                    except socket.error:
                        sleep(1)
            else:                s = self.single_sockets.get(sock)                if s is None:                    continue                s.connected = true                if (event & (POLLHUP | POLLERR)) != 0:                    self._close_socket(s)                    continue                if (event & POLLIN) != 0:                    try:                        s.last_hit = time()                        data = s.socket.recv(100000)                        if data == '':                            self._close_socket(s)                        else:                            s.handler.data_came_in(s, data)                    except socket.error, e:                        code, msg = e                        if code != EWOULDBLOCK:                            self._close_socket(s)                            continue                if (event & POLLOUT) != 0 and s.socket is not None and not s.is_flushed():                    s.try_write()                    if s.is_flushed():                        s.handler.connection_flushed(s)    def pop_external(self):        try:            while true:                (a, b) = self.externally_added.pop()                self.add_task(a, b)        except IndexError:            pass    def listen_forever(self, handler):        self.handler = handler        try:            while not self.doneflag.isSet():                try:                    self.pop_external()                    if len(self.funcs) == 0:                        period = 2 ** 30                    else:                        period = self.funcs[0][0] - time()                    if period < 0:                        period = 0                    events = self.poll.poll(period * timemult)                    if self.doneflag.isSet():                        return                    while len(self.funcs) > 0 and self.funcs[0][0] <= time():                        garbage, func = self.funcs[0]                        del self.funcs[0]                        try:                            func()                        except KeyboardInterrupt:                            self.exception(true)                            return                        except:                            if self.noisy:                                self.exception()                    self._close_dead()                    self.handle_events(events)                    if self.doneflag.isSet():                        return                    self._close_dead()                except error:                    if self.doneflag.isSet():                        return                except KeyboardInterrupt:                    self.exception(true)                    return                except:                    self.exception()                if self.exccount > 10:                    return        finally:            for ss in self.single_sockets.values():                ss.close()            self.server.close()    def exception(self, kbint = false):        self.exccount += 1        if self.errorfunc is None:            print_exc()        else:            data = StringIO()            print_exc(file = data)            print data.getvalue()   # report exception here too            if not kbint:           # don't report here if it's a keyboard interrupt                self.errorfunc(data.getvalue())    def _close_dead(self):        while len(self.dead_from_write) > 0:            old = self.dead_from_write            self.dead_from_write = []            for s in old:                if s.socket is not None:                    self._close_socket(s)    def _close_socket(self, s):        sock = s.socket.fileno()        s.socket.close()        self.poll.unregister(sock)        del self.single_sockets[sock]        s.socket = None        s.handler.connection_lost(s)# everything below is for testingclass DummyHandler:    def __init__(self):        self.external_made = []        self.data_in = []        self.lost = []    def external_connection_made(self, s):        self.external_made.append(s)        def data_came_in(self, s, data):        self.data_in.append((s, data))        def connection_lost(self, s):        self.lost.append(s)    def connection_flushed(self, s):        passdef sl(rs, handler, port):    rs.bind(port)    Thread(target = rs.listen_forever, args = [handler]).start()def loop(rs):    x = []    def r(rs = rs, x = x):        rs.add_task(x[0], .1)    x.append(r)    rs.add_task(r, .1)def test_starting_side_close():    try:        da = DummyHandler()        fa = Event()        sa = RawServer(fa, 100, 100)        loop(sa)        sl(sa, da, 5000)        db = DummyHandler()        fb = Event()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -