📄 mainloop.py
字号:
if not len(self.timeout_list): return [] current_time = time.time() ## create a new timeout list of active callbacks index = 0 for index in range(len(self.timeout_list)): if not self.timeout_list[index].is_expired(current_time): index = index - 1 break ## off-by-one crap! if index < 0: return [] ## slice off the timed-out callbacks timeout_list = self.timeout_list[:index+1] self.timeout_list = self.timeout_list[index+1:] return timeout_listclass ThreadedMainLoop(MainLoop): def __init__(self): MainLoop.__init__(self) ## mutex/locks for internal data self.timeout_lock = thread.allocate_lock() self.idle_lock = thread.allocate_lock() self.read_lock = thread.allocate_lock() self.write_lock = thread.allocate_lock() ## pipe for waking up a select()ing thread self.wakeup_needed = 0 self.wakeup_needed_lock = thread.allocate_lock() self.init_wakeup() def init_wakeup(self): rfd, wfd = os.pipe() self.thread_wakeup_read = os.fdopen(rfd, 'rb', 0) self.thread_wakeup_write = os.fdopen(wfd, 'wb', 0) self.add_read_cb(self.thread_wakeup_read, self.wakeup_cb, None) def wakeup_cb(self, thread_wakeup_read): self.wakeup_needed_lock.acquire(1) self.thread_wakeup_read.read(1) self.wakeup_needed_lock.release() def wakeup(self): self.wakeup_needed_lock.acquire(1) if self.wakeup_needed: self.thread_wakeup_write.write('Z') self.wakeup_needed = 0 self.wakeup_needed_lock.release() def main(self): MainLoop.main(self) ## now remove the wakeup sockets so the mainloop will ## be de-referenced and be garbage collected self.remove_read_cb(self.thread_wakeup_read) def done(self): MainLoop.done(self) self.wakeup() def add_timeout_cb(self, seconds, callback, data): self.timeout_lock.acquire(1) MainLoop.add_timeout_cb(self, seconds, callback, data) self.timeout_lock.release() self.wakeup() def remove_timeout_cb(self, id): self.timeout_lock.acquire(1) MainLoop.remove_timeout_cb(self, id) self.timeout_lock.release() def add_read_cb(self, sock, callback, data): self.read_lock.acquire(1) MainLoop.add_read_cb(self, sock, callback, data) self.read_lock.release() self.wakeup() def remove_read_cb(self, sock): self.read_lock.acquire(1) MainLoop.remove_read_cb(self, sock) self.read_lock.release() def add_write_cb(self, sock, callback, data): self.write_lock.acquire(1) MainLoop.add_write_cb(self, sock, callback, data) self.write_lock.release() self.wakeup() def remove_write_cb(self, sock): self.write_lock.acquire(1) MainLoop.remove_write_cb(self, sock) self.write_lock.release() def calculate_next_timeout(self): self.timeout_lock.acquire(1) seconds = MainLoop.calculate_next_timeout(self) self.timeout_lock.release() return seconds def select(self, seconds): self.read_lock.acquire(1) read_list = self.read_list[:] self.read_lock.release() self.write_lock.acquire(1) write_list = self.write_list[:] self.write_lock.release() return select.select(read_list, write_list, [], seconds) def main_loop_iteration(self): ## at this point, a thread wakeup will most likely be needed self.wakeup_needed_lock.acquire(1) self.wakeup_needed = 1 self.wakeup_needed_lock.release() callback_list = self.main_loop_poll() ## no wakeup needs to occur during dispatch self.wakeup_needed_lock.acquire(1) self.wakeup_needed = 0 self.wakeup_needed_lock.release() self.main_loop_dispatch(callback_list) def find_bad_sockets(self): self.read_lock.acquire(1) self.write_lock.acquire(1) callback_list = MainLoop.find_bad_sockets(self) self.read_lock.release() self.write_lock.release() return callback_list def get_expired_timeouts(self): self.timeout_lock.acquire(1) callback_list = MainLoop.get_expired_timeouts(self) self.timeout_lock.release() return callback_list## *&#$ NT/Win32 can't select on files, so the pipe() mechanism doesn't## work...class Win32ThreadedMainLoop(ThreadedMainLoop): def init_wakeup(self): ## bind a server to localhost sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(0) sock.bind(("127.0.0.1", 0)) sock.listen(1) addr, port = sock.getsockname() ## create a socket to connect to the server self.thread_wakeup_write = socket.socket( socket.AF_INET, socket.SOCK_STREAM) self.thread_wakeup_write.setblocking(0) try: self.thread_wakeup_write.connect((addr, port)) except socket.error: pass ## crazy loopy while 1: try: self.thread_wakeup_read, addr = sock.accept() except socket.error: time.sleep(0.1) else: sock.close() break self.thread_wakeup_read.setblocking(0) self.add_read_cb(self.thread_wakeup_read, self.wakeup_cb, None) def wakeup_cb(self, thread_wakeup_read): self.wakeup_needed_lock.acquire(1) try: self.thread_wakeup_read.recv(1) except socket.error: pass self.wakeup_needed_lock.release() def wakeup(self): self.wakeup_needed_lock.acquire(1) if self.wakeup_needed: self.thread_wakeup_write.send('Z') self.wakeup_needed = 0 self.wakeup_needed_lock.release()## use the correct class of MainLoop depending on platform type_mainloop_class = Noneif os.name == 'posix': _mainloop_class = ThreadedMainLoopelif os.name == 'nt' or os.name == 'dos': _mainloop_class = Win32ThreadedMainLoopelif os.name == 'mac': _mainloop_class = MainLoop## working instance of the network engine_engine_hash = {}_engine_hash_lock = thread.allocate_lock()def get_thread_engine(thread_ident): _engine_hash_lock.acquire(1) try: current_engine = _engine_hash[thread_ident] except KeyError: _engine_hash[thread_ident] = current_engine = _mainloop_class() _engine_hash_lock.release() return current_enginedef delete_thread_engine(thread_ident): _engine_hash_lock.acquire(1) try: del _engine_hash[thread_ident] except KeyError: pass _engine_hash_lock.release() def get_engine(): return get_thread_engine(thread.get_ident())def delete_engine(): delete_thread_engine(thread.get_ident())## mainloop APIdef main(): get_engine().main()def main_loop_iteration(): get_engine().main_loop_iteration()def done(): get_engine().done()def add_timeout_cb(seconds, callback, data = None): return get_engine().add_timeout_cb(seconds, callback, data)def remove_timeout_cb(id): get_engine().remove_timeout_cb(id)def add_idle_cb(callback, data = None): get_engine().add_timeout_cb(0, callback, data)def add_read_cb(sock, callback, data = None): get_engine().add_read_cb(sock, callback, data)def remove_read_cb(sock): get_engine().remove_read_cb(sock) def add_write_cb(sock, callback, data = None): get_engine().add_write_cb(sock, callback, data)def remove_write_cb(sock): get_engine().remove_write_cb(sock)def add_tcp_server(port, callback, data = None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('', port)) sock.listen(5) add_read_cb(sock, callback, data) return sock## scheduling on other threadsdef add_timeout_cb_on_thread(thread_ident, seconds, callback, data = None): get_thread_engine(thread_ident).add_timeout_cb(seconds, callback, data) def remove_timeout_cb_on_thread(thread_ident, id): get_thread_engine(thread_ident).remove_timeout_cb(id) def add_idle_cb_on_thread(thread_ident, callback, data = None): get_thread_engine(thread_ident).add_timeout_cb(0, callback, data) def add_read_cb_on_thread(thread_ident, sock, callback, data = None): get_thread_engine(thread_ident).add_read_cb(sock, callback, data)def remove_read_cb_on_thread(thread_ident, sock): get_thread_engine(thread_ident).remove_read_cb(sock) def add_write_cb_on_thread(thread_ident, sock, callback, data = None): get_thread_engine(thread_ident).add_write_cb(sock, callback, data)def remove_write_cb_on_thread(thread_ident, sock): get_thread_engine(thread_ident).remove_write_cb(sock)def add_tcp_server_on_thread(thread_ident, port, callback, data = None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('', port)) sock.listen(5) add_read_cb_on_thread(thread_ident, sock, callback, data) return sock
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -