📄 rawserver.py
字号:
sb = RawServer(fb, 100, 100) loop(sb) sl(sb, db, 5001) sleep(.5) ca = sa.start_connection(('', 5001)) sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert len(db.external_made) == 1 cb = db.external_made[0] del db.external_made[:] assert db.data_in == [] assert db.lost == [] ca.write('aaa') cb.write('bbb') sleep(1) assert da.external_made == [] assert da.data_in == [(ca, 'bbb')] del da.data_in[:] assert da.lost == [] assert db.external_made == [] assert db.data_in == [(cb, 'aaa')] del db.data_in[:] assert db.lost == [] ca.write('ccc') cb.write('ddd') sleep(1) assert da.external_made == [] assert da.data_in == [(ca, 'ddd')] del da.data_in[:] assert da.lost == [] assert db.external_made == [] assert db.data_in == [(cb, 'ccc')] del db.data_in[:] assert db.lost == [] ca.close() sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert db.external_made == [] assert db.data_in == [] assert db.lost == [cb] del db.lost[:] finally: fa.set() fb.set()def test_receiving_side_close(): try: da = DummyHandler() fa = Event() sa = RawServer(fa, 100, 100) loop(sa) sl(sa, da, 5002) db = DummyHandler() fb = Event() sb = RawServer(fb, 100, 100) loop(sb) sl(sb, db, 5003) sleep(.5) ca = sa.start_connection(('', 5003)) sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert len(db.external_made) == 1 cb = db.external_made[0] del db.external_made[:] assert db.data_in == [] assert db.lost == [] ca.write('aaa') cb.write('bbb') sleep(1) assert da.external_made == [] assert da.data_in == [(ca, 'bbb')] del da.data_in[:] assert da.lost == [] assert db.external_made == [] assert db.data_in == [(cb, 'aaa')] del db.data_in[:] assert db.lost == [] ca.write('ccc') cb.write('ddd') sleep(1) assert da.external_made == [] assert da.data_in == [(ca, 'ddd')] del da.data_in[:] assert da.lost == [] assert db.external_made == [] assert db.data_in == [(cb, 'ccc')] del db.data_in[:] assert db.lost == [] cb.close() sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [ca] del da.lost[:] assert db.external_made == [] assert db.data_in == [] assert db.lost == [] finally: fa.set() fb.set()def test_connection_refused(): try: da = DummyHandler() fa = Event() sa = RawServer(fa, 100, 100) loop(sa) sl(sa, da, 5006) sleep(.5) ca = sa.start_connection(('', 5007)) sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [ca] del da.lost[:] finally: fa.set()def test_both_close(): try: da = DummyHandler() fa = Event() sa = RawServer(fa, 100, 100) loop(sa) sl(sa, da, 5004) sleep(1) db = DummyHandler() fb = Event() sb = RawServer(fb, 100, 100) loop(sb) sl(sb, db, 5005) sleep(.5) ca = sa.start_connection(('', 5005)) sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert len(db.external_made) == 1 cb = db.external_made[0] del db.external_made[:] assert db.data_in == [] assert db.lost == [] ca.write('aaa') cb.write('bbb') sleep(1) assert da.external_made == [] assert da.data_in == [(ca, 'bbb')] del da.data_in[:] assert da.lost == [] assert db.external_made == [] assert db.data_in == [(cb, 'aaa')] del db.data_in[:] assert db.lost == [] ca.write('ccc') cb.write('ddd') sleep(1) assert da.external_made == [] assert da.data_in == [(ca, 'ddd')] del da.data_in[:] assert da.lost == [] assert db.external_made == [] assert db.data_in == [(cb, 'ccc')] del db.data_in[:] assert db.lost == [] ca.close() cb.close() sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert db.external_made == [] assert db.data_in == [] assert db.lost == [] finally: fa.set() fb.set()def test_normal(): l = [] f = Event() s = RawServer(f, 100, 100) loop(s) sl(s, DummyHandler(), 5007) s.add_task(lambda l = l: l.append('b'), 2) s.add_task(lambda l = l: l.append('a'), 1) s.add_task(lambda l = l: l.append('d'), 4) sleep(1.5) s.add_task(lambda l = l: l.append('c'), 1.5) sleep(3) assert l == ['a', 'b', 'c', 'd'] f.set()def test_catch_exception(): l = [] f = Event() s = RawServer(f, 100, 100, false) loop(s) sl(s, DummyHandler(), 5009) s.add_task(lambda l = l: l.append('b'), 2) s.add_task(lambda: 4/0, 1) sleep(3) assert l == ['b'] f.set()def test_closes_if_not_hit(): try: da = DummyHandler() fa = Event() sa = RawServer(fa, 2, 2) loop(sa) sl(sa, da, 5012) sleep(1) db = DummyHandler() fb = Event() sb = RawServer(fb, 100, 100) loop(sb) sl(sb, db, 5013) sleep(.5) sa.start_connection(('', 5013)) sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert len(db.external_made) == 1 del db.external_made[:] assert db.data_in == [] assert db.lost == [] sleep(3.1) assert len(da.lost) == 1 assert len(db.lost) == 1 finally: fa.set() fb.set()def test_does_not_close_if_hit(): try: da = DummyHandler() fa = Event() sa = RawServer(fa, 2, 2) loop(sa) sl(sa, da, 5012) sleep(1) db = DummyHandler() fb = Event() sb = RawServer(fb, 100, 100) loop(sb) sl(sb, db, 5013) sleep(.5) sa.start_connection(('', 5013)) sleep(1) assert da.external_made == [] assert da.data_in == [] assert da.lost == [] assert len(db.external_made) == 1 cb = db.external_made[0] del db.external_made[:] assert db.data_in == [] assert db.lost == [] cb.write('bbb') sleep(2) assert da.lost == [] assert db.lost == [] finally: fa.set() fb.set()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -