test_asynchat.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 59 行
PY
59 行
# test asynchat -- requires threadingimport thread # If this fails, we can't test this moduleimport asyncore, asynchat, socket, threading, timeHOST = "127.0.0.1"PORT = 54321class echo_server(threading.Thread): def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((HOST, PORT)) sock.listen(1) conn, client = sock.accept() buffer = "" while "\n" not in buffer: data = conn.recv(10) if not data: break buffer = buffer + data while buffer: n = conn.send(buffer) buffer = buffer[n:] conn.close() sock.close()class echo_client(asynchat.async_chat): def __init__(self): asynchat.async_chat.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((HOST, PORT)) self.set_terminator("\n") self.buffer = "" def handle_connect(self): print "Connected" def collect_incoming_data(self, data): self.buffer = self.buffer + data def found_terminator(self): print "Received:", `self.buffer` self.buffer = "" self.close()def main(): s = echo_server() s.start() time.sleep(1) # Give server time to initialize c = echo_client() c.push("hello ") c.push("world\n") asyncore.loop()main()
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?