⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rawserver.py

📁 BitTorrentABC-Linux-V.2.4.3源码
💻 PY
📖 第 1 页 / 共 2 页
字号:
        sb = RawServer(fb, 100, 100)        loop(sb)        sl(sb, db, 5001)        sleep(.5)        ca = sa.start_connection(('', 5001))        sleep(1)                assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert len(db.external_made) == 1        cb = db.external_made[0]        del db.external_made[:]        assert db.data_in == []        assert db.lost == []        ca.write('aaa')        cb.write('bbb')        sleep(1)                assert da.external_made == []        assert da.data_in == [(ca, 'bbb')]        del da.data_in[:]        assert da.lost == []        assert db.external_made == []        assert db.data_in == [(cb, 'aaa')]        del db.data_in[:]        assert db.lost == []        ca.write('ccc')        cb.write('ddd')        sleep(1)                assert da.external_made == []        assert da.data_in == [(ca, 'ddd')]        del da.data_in[:]        assert da.lost == []        assert db.external_made == []        assert db.data_in == [(cb, 'ccc')]        del db.data_in[:]        assert db.lost == []        ca.close()        sleep(1)        assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert db.external_made == []        assert db.data_in == []        assert db.lost == [cb]        del db.lost[:]    finally:        fa.set()        fb.set()def test_receiving_side_close():    try:        da = DummyHandler()        fa = Event()        sa = RawServer(fa, 100, 100)        loop(sa)        sl(sa, da, 5002)        db = DummyHandler()        fb = Event()        sb = RawServer(fb, 100, 100)        loop(sb)        sl(sb, db, 5003)                sleep(.5)        ca = sa.start_connection(('', 5003))        sleep(1)                assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert len(db.external_made) == 1        cb = db.external_made[0]        del db.external_made[:]        assert db.data_in == []        assert db.lost == []        ca.write('aaa')        cb.write('bbb')        sleep(1)                assert da.external_made == []        assert da.data_in == [(ca, 'bbb')]        del da.data_in[:]        assert da.lost == []        assert db.external_made == []        assert db.data_in == [(cb, 'aaa')]        del db.data_in[:]        assert db.lost == []        ca.write('ccc')        cb.write('ddd')        sleep(1)                assert da.external_made == []        assert da.data_in == [(ca, 'ddd')]        del da.data_in[:]        assert da.lost == []        assert db.external_made == []        assert db.data_in == [(cb, 'ccc')]        del db.data_in[:]        assert db.lost == []        cb.close()        sleep(1)        assert da.external_made == []        assert da.data_in == []        assert da.lost == [ca]        del da.lost[:]        assert db.external_made == []        assert db.data_in == []        assert db.lost == []    finally:        fa.set()        fb.set()def test_connection_refused():    try:        da = DummyHandler()        fa = Event()        sa = RawServer(fa, 100, 100)        loop(sa)        sl(sa, da, 5006)        sleep(.5)        ca = sa.start_connection(('', 5007))        sleep(1)                assert da.external_made == []        assert da.data_in == []        assert da.lost == [ca]        del da.lost[:]    finally:        fa.set()def test_both_close():    try:        da = DummyHandler()        fa = Event()        sa = RawServer(fa, 100, 100)        loop(sa)        sl(sa, da, 5004)        sleep(1)        db = DummyHandler()        fb = Event()        sb = RawServer(fb, 100, 100)        loop(sb)        sl(sb, db, 5005)        sleep(.5)        ca = sa.start_connection(('', 5005))        sleep(1)                assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert len(db.external_made) == 1        cb = db.external_made[0]        del db.external_made[:]        assert db.data_in == []        assert db.lost == []        ca.write('aaa')        cb.write('bbb')        sleep(1)                assert da.external_made == []        assert da.data_in == [(ca, 'bbb')]        del da.data_in[:]        assert da.lost == []        assert db.external_made == []        assert db.data_in == [(cb, 'aaa')]        del db.data_in[:]        assert db.lost == []        ca.write('ccc')        cb.write('ddd')        sleep(1)                assert da.external_made == []        assert da.data_in == [(ca, 'ddd')]        del da.data_in[:]        assert da.lost == []        assert db.external_made == []        assert db.data_in == [(cb, 'ccc')]        del db.data_in[:]        assert db.lost == []        ca.close()        cb.close()        sleep(1)        assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert db.external_made == []        assert db.data_in == []        assert db.lost == []    finally:        fa.set()        fb.set()def test_normal():    l = []    f = Event()    s = RawServer(f, 100, 100)    loop(s)    sl(s, DummyHandler(), 5007)    s.add_task(lambda l = l: l.append('b'), 2)    s.add_task(lambda l = l: l.append('a'), 1)    s.add_task(lambda l = l: l.append('d'), 4)    sleep(1.5)    s.add_task(lambda l = l: l.append('c'), 1.5)    sleep(3)    assert l == ['a', 'b', 'c', 'd']    f.set()def test_catch_exception():    l = []    f = Event()    s = RawServer(f, 100, 100, false)    loop(s)    sl(s, DummyHandler(), 5009)    s.add_task(lambda l = l: l.append('b'), 2)    s.add_task(lambda: 4/0, 1)    sleep(3)    assert l == ['b']    f.set()def test_closes_if_not_hit():    try:        da = DummyHandler()        fa = Event()        sa = RawServer(fa, 2, 2)        loop(sa)        sl(sa, da, 5012)        sleep(1)        db = DummyHandler()        fb = Event()        sb = RawServer(fb, 100, 100)        loop(sb)        sl(sb, db, 5013)                sleep(.5)        sa.start_connection(('', 5013))        sleep(1)                assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert len(db.external_made) == 1        del db.external_made[:]        assert db.data_in == []        assert db.lost == []        sleep(3.1)                assert len(da.lost) == 1        assert len(db.lost) == 1    finally:        fa.set()        fb.set()def test_does_not_close_if_hit():    try:        da = DummyHandler()        fa = Event()        sa = RawServer(fa, 2, 2)        loop(sa)        sl(sa, da, 5012)        sleep(1)        db = DummyHandler()        fb = Event()        sb = RawServer(fb, 100, 100)        loop(sb)        sl(sb, db, 5013)                sleep(.5)        sa.start_connection(('', 5013))        sleep(1)                assert da.external_made == []        assert da.data_in == []        assert da.lost == []        assert len(db.external_made) == 1        cb = db.external_made[0]        del db.external_made[:]        assert db.data_in == []        assert db.lost == []        cb.write('bbb')        sleep(2)                assert da.lost == []        assert db.lost == []    finally:        fa.set()        fb.set()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -