⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pool.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
        p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)        c1 = p.connect()        c_id = c1.connection.id        c1.close(); c1=None        c1 = p.connect()        assert c1.connection.id == c_id        dbapi.raise_error = True        c1.invalidate()        c1 = None        c1 = p.connect()        assert c1.connection.id != c_id    def test_detach(self):        dbapi = MockDBAPI()        p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)        c1 = p.connect()        c1.detach()        c_id = c1.connection.id        c2 = p.connect()        assert c2.connection.id != c1.connection.id        dbapi.raise_error = True        c2.invalidate()        c2 = None        c2 = p.connect()        assert c2.connection.id != c1.connection.id        con = c1.connection        assert not con.closed        c1.close()        assert con.closed    def test_threadfairy(self):        p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)        c1 = p.connect()        c1.close()        c2 = p.connect()        assert c2.connection is not None    def testthreadlocal_del(self):        self._do_testthreadlocal(useclose=False)    def testthreadlocal_close(self):        self._do_testthreadlocal(useclose=True)    def _do_testthreadlocal(self, useclose=False):        for p in (            pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True),            pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True)        ):            c1 = p.connect()            c2 = p.connect()            self.assert_(c1 is c2)            c3 = p.unique_connection()            self.assert_(c3 is not c1)            if useclose:                c2.close()            else:                c2 = None            c2 = p.connect()            self.assert_(c1 is c2)            self.assert_(c3 is not c1)            if useclose:                c2.close()            else:                c2 = None            if useclose:                c1 = p.connect()                c2 = p.connect()                c3 = p.connect()                c3.close()                c2.close()                self.assert_(c1.connection is not None)                c1.close()            c1 = c2 = c3 = None            # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced            if isinstance(p, pool.QueuePool):                self.assert_(p.checkedout() == 0)                c1 = p.connect()                c2 = p.connect()                if useclose:                    c2.close()                    c1.close()                else:                    c2 = None                    c1 = None                self.assert_(p.checkedout() == 0)    def test_properties(self):        dbapi = MockDBAPI()        p = pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),                           pool_size=1, max_overflow=0, use_threadlocal=False)        c = p.connect()        self.assert_(not c.info)        self.assert_(c.info is c._connection_record.info)        c.info['foo'] = 'bar'        c.close()        del c        c = p.connect()        self.assert_('foo' in c.info)        c.invalidate()        c = p.connect()        self.assert_('foo' not in c.info)        c.info['foo2'] = 'bar2'        c.detach()        self.assert_('foo2' in c.info)        c2 = p.connect()        self.assert_(c.connection is not c2.connection)        self.assert_(not c2.info)        self.assert_('foo2' in c.info)    def test_listeners(self):        dbapi = MockDBAPI()        class InstrumentingListener(object):            def __init__(self):                if hasattr(self, 'connect'):                    self.connect = self.inst_connect                if hasattr(self, 'checkout'):                    self.checkout = self.inst_checkout                if hasattr(self, 'checkin'):                    self.checkin = self.inst_checkin                self.clear()            def clear(self):                self.connected = []                self.checked_out = []                self.checked_in = []            def assert_total(innerself, conn, cout, cin):                self.assert_(len(innerself.connected) == conn)                self.assert_(len(innerself.checked_out) == cout)                self.assert_(len(innerself.checked_in) == cin)            def assert_in(innerself, item, in_conn, in_cout, in_cin):                self.assert_((item in innerself.connected) == in_conn)                self.assert_((item in innerself.checked_out) == in_cout)                self.assert_((item in innerself.checked_in) == in_cin)            def inst_connect(self, con, record):                print "connect(%s, %s)" % (con, record)                assert con is not None                assert record is not None                self.connected.append(con)            def inst_checkout(self, con, record, proxy):                print "checkout(%s, %s, %s)" % (con, record, proxy)                assert con is not None                assert record is not None                assert proxy is not None                self.checked_out.append(con)            def inst_checkin(self, con, record):                print "checkin(%s, %s)" % (con, record)                # con can be None if invalidated                assert record is not None                self.checked_in.append(con)        class ListenAll(interfaces.PoolListener, InstrumentingListener):            pass        class ListenConnect(InstrumentingListener):            def connect(self, con, record):                pass        class ListenCheckOut(InstrumentingListener):            def checkout(self, con, record, proxy, num):                pass        class ListenCheckIn(InstrumentingListener):            def checkin(self, con, proxy, record):                pass        def _pool(**kw):            return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), use_threadlocal=False, **kw)            #, pool_size=1, max_overflow=0, **kw)        def assert_listeners(p, total, conn, cout, cin):            for instance in (p, p.recreate()):                self.assert_(len(instance.listeners) == total)                self.assert_(len(instance._on_connect) == conn)                self.assert_(len(instance._on_checkout) == cout)                self.assert_(len(instance._on_checkin) == cin)        p = _pool()        assert_listeners(p, 0, 0, 0, 0)        p.add_listener(ListenAll())        assert_listeners(p, 1, 1, 1, 1)        p.add_listener(ListenConnect())        assert_listeners(p, 2, 2, 1, 1)        p.add_listener(ListenCheckOut())        assert_listeners(p, 3, 2, 2, 1)        p.add_listener(ListenCheckIn())        assert_listeners(p, 4, 2, 2, 2)        del p        print "----"        snoop = ListenAll()        p = _pool(listeners=[snoop])        assert_listeners(p, 1, 1, 1, 1)        c = p.connect()        snoop.assert_total(1, 1, 0)        cc = c.connection        snoop.assert_in(cc, True, True, False)        c.close()        snoop.assert_in(cc, True, True, True)        del c, cc        snoop.clear()        # this one depends on immediate gc        c = p.connect()        cc = c.connection        snoop.assert_in(cc, False, True, False)        snoop.assert_total(0, 1, 0)        del c, cc        snoop.assert_total(0, 1, 1)        p.dispose()        snoop.clear()        c = p.connect()        c.close()        c = p.connect()        snoop.assert_total(1, 2, 1)        c.close()        snoop.assert_total(1, 2, 2)        # invalidation        p.dispose()        snoop.clear()        c = p.connect()        snoop.assert_total(1, 1, 0)        c.invalidate()        snoop.assert_total(1, 1, 1)        c.close()        snoop.assert_total(1, 1, 1)        del c        snoop.assert_total(1, 1, 1)        c = p.connect()        snoop.assert_total(2, 2, 1)        c.close()        del c        snoop.assert_total(2, 2, 2)        # detached        p.dispose()        snoop.clear()        c = p.connect()        snoop.assert_total(1, 1, 0)        c.detach()        snoop.assert_total(1, 1, 0)        c.close()        del c        snoop.assert_total(1, 1, 0)        c = p.connect()        snoop.assert_total(2, 2, 0)        c.close()        del c        snoop.assert_total(2, 2, 1)    def tearDown(self):       pool.clear_managers()if __name__ == "__main__":    testenv.main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -