test_asynchat.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 59 行

PY
59
字号
# test asynchat -- requires threadingimport thread # If this fails, we can't test this moduleimport asyncore, asynchat, socket, threading, timeHOST = "127.0.0.1"PORT = 54321class echo_server(threading.Thread):    def run(self):        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        sock.bind((HOST, PORT))        sock.listen(1)        conn, client = sock.accept()        buffer = ""        while "\n" not in buffer:            data = conn.recv(10)            if not data:                break            buffer = buffer + data        while buffer:            n = conn.send(buffer)            buffer = buffer[n:]        conn.close()        sock.close()class echo_client(asynchat.async_chat):    def __init__(self):        asynchat.async_chat.__init__(self)        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)        self.connect((HOST, PORT))        self.set_terminator("\n")        self.buffer = ""    def handle_connect(self):        print "Connected"    def collect_incoming_data(self, data):        self.buffer = self.buffer + data    def found_terminator(self):        print "Received:", `self.buffer`        self.buffer = ""        self.close()def main():    s = echo_server()    s.start()    time.sleep(1) # Give server time to initialize    c = echo_client()    c.push("hello ")    c.push("world\n")    asyncore.loop()main()

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?