📄 rawserver.py
字号:
# Written by Bram Cohen# see LICENSE.txt for license informationfrom bisect import insortimport socketfrom cStringIO import StringIOfrom traceback import print_excfrom errno import EWOULDBLOCK, ECONNREFUSED, EHOSTUNREACHtry: from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP timemult = 1000except ImportError: from selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP timemult = 1from threading import Thread, Eventfrom time import time, sleepimport sysfrom random import randrangetrue = 1false = 0all = POLLIN | POLLOUTclass SingleSocket: def __init__(self, raw_server, sock, handler, ip = None): self.raw_server = raw_server self.socket = sock self.handler = handler self.buffer = [] self.last_hit = time() self.fileno = sock.fileno() self.connected = false try: self.ip = self.socket.getpeername()[0] except: if ip is None: self.ip = 'unknown' else: self.ip = ip def get_ip(self): return self.ip def close(self): sock = self.socket self.socket = None self.buffer = [] del self.raw_server.single_sockets[self.fileno] self.raw_server.poll.unregister(sock) sock.close() def shutdown(self, val): self.socket.shutdown(val) def is_flushed(self): return len(self.buffer) == 0 def write(self, s): assert self.socket is not None self.buffer.append(s) if len(self.buffer) == 1: self.try_write() def try_write(self): if self.connected: try: while self.buffer != []: amount = self.socket.send(self.buffer[0]) if amount != len(self.buffer[0]): if amount != 0: self.buffer[0] = self.buffer[0][amount:] break del self.buffer[0] except socket.error, e: try: code, msg = e except: raise TypeError, "cannot decode "+str(e) if code != EWOULDBLOCK: self.raw_server.dead_from_write.append(self) return if self.buffer == []: self.raw_server.poll.register(self.socket, POLLIN) else: self.raw_server.poll.register(self.socket, all)class RawServer: def __init__(self, doneflag, timeout_check_interval, timeout, noisy = true, errorfunc = None): self.timeout_check_interval = timeout_check_interval self.timeout = timeout self.poll = poll() # {socket: SingleSocket} self.single_sockets = {} self.dead_from_write = [] self.doneflag = doneflag self.noisy = noisy self.errorfunc = errorfunc self.exccount = 0 self.funcs = [] self.externally_added = [] self.add_task(self.scan_for_timeouts, timeout_check_interval) def add_task(self, func, delay): insort(self.funcs, (time() + delay, func)) def external_add_task(self, func, delay = 0): self.externally_added.append((func, delay)) def scan_for_timeouts(self): self.add_task(self.scan_for_timeouts, self.timeout_check_interval) t = time() - self.timeout tokill = [] for s in self.single_sockets.values(): if s.last_hit < t: tokill.append(s) for k in tokill: if k.socket is not None: self._close_socket(k) def bind(self, port, bind = '', reuse = false): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuse: server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.setblocking(0) server.bind((bind, port)) server.listen(5) self.poll.register(server, POLLIN) self.server = server def start_connection(self, dns, handler = None): if handler is None: handler = self.handler sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(0) try: sock.connect_ex(dns) except socket.error: raise except Exception, e: raise socket.error(str(e)) self.poll.register(sock, POLLIN) s = SingleSocket(self, sock, handler, dns[0]) self.single_sockets[sock.fileno()] = s return s def handle_events(self, events): for sock, event in events: if sock == self.server.fileno(): if event & (POLLHUP | POLLERR) != 0: self.poll.unregister(self.server) self.server.close() print "lost server socket" else: try: newsock, addr = self.server.accept() newsock.setblocking(0)
nss = SingleSocket(self, newsock, self.handler)
self.single_sockets[newsock.fileno()] = nss
self.poll.register(newsock, POLLIN)
self.handler.external_connection_made(nss)
except socket.error:
sleep(1)
else: s = self.single_sockets.get(sock) if s is None: continue s.connected = true if (event & (POLLHUP | POLLERR)) != 0: self._close_socket(s) continue if (event & POLLIN) != 0: try: s.last_hit = time() data = s.socket.recv(100000) if data == '': self._close_socket(s) else: s.handler.data_came_in(s, data) except socket.error, e: code, msg = e if code != EWOULDBLOCK: self._close_socket(s) continue if (event & POLLOUT) != 0 and s.socket is not None and not s.is_flushed(): s.try_write() if s.is_flushed(): s.handler.connection_flushed(s) def pop_external(self): try: while true: (a, b) = self.externally_added.pop() self.add_task(a, b) except IndexError: pass def listen_forever(self, handler): self.handler = handler try: while not self.doneflag.isSet(): try: self.pop_external() if len(self.funcs) == 0: period = 2 ** 30 else: period = self.funcs[0][0] - time() if period < 0: period = 0 events = self.poll.poll(period * timemult) if self.doneflag.isSet(): return while len(self.funcs) > 0 and self.funcs[0][0] <= time(): garbage, func = self.funcs[0] del self.funcs[0] try: func() except KeyboardInterrupt: self.exception(true) return except: if self.noisy: self.exception() self._close_dead() self.handle_events(events) if self.doneflag.isSet(): return self._close_dead() except error: if self.doneflag.isSet(): return except KeyboardInterrupt: self.exception(true) return except: self.exception() if self.exccount > 10: return finally: for ss in self.single_sockets.values(): ss.close() self.server.close() def exception(self, kbint = false): self.exccount += 1 if self.errorfunc is None: print_exc() else: data = StringIO() print_exc(file = data) print data.getvalue() # report exception here too if not kbint: # don't report here if it's a keyboard interrupt self.errorfunc(data.getvalue()) def _close_dead(self): while len(self.dead_from_write) > 0: old = self.dead_from_write self.dead_from_write = [] for s in old: if s.socket is not None: self._close_socket(s) def _close_socket(self, s): sock = s.socket.fileno() s.socket.close() self.poll.unregister(sock) del self.single_sockets[sock] s.socket = None s.handler.connection_lost(s)# everything below is for testingclass DummyHandler: def __init__(self): self.external_made = [] self.data_in = [] self.lost = [] def external_connection_made(self, s): self.external_made.append(s) def data_came_in(self, s, data): self.data_in.append((s, data)) def connection_lost(self, s): self.lost.append(s) def connection_flushed(self, s): passdef sl(rs, handler, port): rs.bind(port) Thread(target = rs.listen_forever, args = [handler]).start()def loop(rs): x = [] def r(rs = rs, x = x): rs.add_task(x[0], .1) x.append(r) rs.add_task(r, .1)def test_starting_side_close(): try: da = DummyHandler() fa = Event() sa = RawServer(fa, 100, 100) loop(sa) sl(sa, da, 5000) db = DummyHandler() fb = Event()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -