📄 pool.py
字号:
p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) c1 = p.connect() c_id = c1.connection.id c1.close(); c1=None c1 = p.connect() assert c1.connection.id == c_id dbapi.raise_error = True c1.invalidate() c1 = None c1 = p.connect() assert c1.connection.id != c_id def test_detach(self): dbapi = MockDBAPI() p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) c1 = p.connect() c1.detach() c_id = c1.connection.id c2 = p.connect() assert c2.connection.id != c1.connection.id dbapi.raise_error = True c2.invalidate() c2 = None c2 = p.connect() assert c2.connection.id != c1.connection.id con = c1.connection assert not con.closed c1.close() assert con.closed def test_threadfairy(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True) c1 = p.connect() c1.close() c2 = p.connect() assert c2.connection is not None def testthreadlocal_del(self): self._do_testthreadlocal(useclose=False) def testthreadlocal_close(self): self._do_testthreadlocal(useclose=True) def _do_testthreadlocal(self, useclose=False): for p in ( pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True), pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True) ): c1 = p.connect() c2 = p.connect() self.assert_(c1 is c2) c3 = p.unique_connection() self.assert_(c3 is not c1) if useclose: c2.close() else: c2 = None c2 = p.connect() self.assert_(c1 is c2) self.assert_(c3 is not c1) if useclose: c2.close() else: c2 = None if useclose: c1 = p.connect() c2 = p.connect() c3 = p.connect() c3.close() c2.close() self.assert_(c1.connection is not None) c1.close() c1 = c2 = c3 = None # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced if isinstance(p, pool.QueuePool): self.assert_(p.checkedout() == 0) c1 = p.connect() c2 = p.connect() if useclose: c2.close() c1.close() else: c2 = None c1 = None self.assert_(p.checkedout() == 0) def test_properties(self): dbapi = MockDBAPI() p = pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), pool_size=1, max_overflow=0, use_threadlocal=False) c = p.connect() self.assert_(not c.info) self.assert_(c.info is c._connection_record.info) c.info['foo'] = 'bar' c.close() del c c = p.connect() self.assert_('foo' in c.info) c.invalidate() c = p.connect() self.assert_('foo' not in c.info) c.info['foo2'] = 'bar2' c.detach() self.assert_('foo2' in c.info) c2 = p.connect() self.assert_(c.connection is not c2.connection) self.assert_(not c2.info) self.assert_('foo2' in c.info) def test_listeners(self): dbapi = MockDBAPI() class InstrumentingListener(object): def __init__(self): if hasattr(self, 'connect'): self.connect = self.inst_connect if hasattr(self, 'checkout'): self.checkout = self.inst_checkout if hasattr(self, 'checkin'): self.checkin = self.inst_checkin self.clear() def clear(self): self.connected = [] self.checked_out = [] self.checked_in = [] def assert_total(innerself, conn, cout, cin): self.assert_(len(innerself.connected) == conn) self.assert_(len(innerself.checked_out) == cout) self.assert_(len(innerself.checked_in) == cin) def assert_in(innerself, item, in_conn, in_cout, in_cin): self.assert_((item in innerself.connected) == in_conn) self.assert_((item in innerself.checked_out) == in_cout) self.assert_((item in innerself.checked_in) == in_cin) def inst_connect(self, con, record): print "connect(%s, %s)" % (con, record) assert con is not None assert record is not None self.connected.append(con) def inst_checkout(self, con, record, proxy): print "checkout(%s, %s, %s)" % (con, record, proxy) assert con is not None assert record is not None assert proxy is not None self.checked_out.append(con) def inst_checkin(self, con, record): print "checkin(%s, %s)" % (con, record) # con can be None if invalidated assert record is not None self.checked_in.append(con) class ListenAll(interfaces.PoolListener, InstrumentingListener): pass class ListenConnect(InstrumentingListener): def connect(self, con, record): pass class ListenCheckOut(InstrumentingListener): def checkout(self, con, record, proxy, num): pass class ListenCheckIn(InstrumentingListener): def checkin(self, con, proxy, record): pass def _pool(**kw): return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), use_threadlocal=False, **kw) #, pool_size=1, max_overflow=0, **kw) def assert_listeners(p, total, conn, cout, cin): for instance in (p, p.recreate()): self.assert_(len(instance.listeners) == total) self.assert_(len(instance._on_connect) == conn) self.assert_(len(instance._on_checkout) == cout) self.assert_(len(instance._on_checkin) == cin) p = _pool() assert_listeners(p, 0, 0, 0, 0) p.add_listener(ListenAll()) assert_listeners(p, 1, 1, 1, 1) p.add_listener(ListenConnect()) assert_listeners(p, 2, 2, 1, 1) p.add_listener(ListenCheckOut()) assert_listeners(p, 3, 2, 2, 1) p.add_listener(ListenCheckIn()) assert_listeners(p, 4, 2, 2, 2) del p print "----" snoop = ListenAll() p = _pool(listeners=[snoop]) assert_listeners(p, 1, 1, 1, 1) c = p.connect() snoop.assert_total(1, 1, 0) cc = c.connection snoop.assert_in(cc, True, True, False) c.close() snoop.assert_in(cc, True, True, True) del c, cc snoop.clear() # this one depends on immediate gc c = p.connect() cc = c.connection snoop.assert_in(cc, False, True, False) snoop.assert_total(0, 1, 0) del c, cc snoop.assert_total(0, 1, 1) p.dispose() snoop.clear() c = p.connect() c.close() c = p.connect() snoop.assert_total(1, 2, 1) c.close() snoop.assert_total(1, 2, 2) # invalidation p.dispose() snoop.clear() c = p.connect() snoop.assert_total(1, 1, 0) c.invalidate() snoop.assert_total(1, 1, 1) c.close() snoop.assert_total(1, 1, 1) del c snoop.assert_total(1, 1, 1) c = p.connect() snoop.assert_total(2, 2, 1) c.close() del c snoop.assert_total(2, 2, 2) # detached p.dispose() snoop.clear() c = p.connect() snoop.assert_total(1, 1, 0) c.detach() snoop.assert_total(1, 1, 0) c.close() del c snoop.assert_total(1, 1, 0) c = p.connect() snoop.assert_total(2, 2, 0) c.close() del c snoop.assert_total(2, 2, 1) def tearDown(self): pool.clear_managers()if __name__ == "__main__": testenv.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -