⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transaction.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
        self.assertEquals(            connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),            [(1,),(2,),(5,)]        )        connection.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    # fixme: see if this is still true and/or can be convert to fails_on()    @testing.unsupported('mysql')    def testtwophaserecover(self):        # MySQL recovery doesn't currently seem to work correctly        # Prepared transactions disappear when connections are closed and even        # when they aren't it doesn't seem possible to use the recovery id.        connection = testing.db.connect()        transaction = connection.begin_twophase()        connection.execute(users.insert(), user_id=1, user_name='user1')        transaction.prepare()        connection.close()        connection2 = testing.db.connect()        self.assertEquals(            connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),            []        )        recoverables = connection2.recover_twophase()        self.assertTrue(            transaction.xid in recoverables        )        connection2.commit_prepared(transaction.xid, recover=True)        self.assertEquals(            connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),            [(1,)]        )        connection2.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @testing.exclude('mysql', '<', (5, 0, 3))    def testmultipletwophase(self):        conn = testing.db.connect()        xa = conn.begin_twophase()        conn.execute(users.insert(), user_id=1, user_name='user1')        xa.prepare()        xa.commit()        xa = conn.begin_twophase()        conn.execute(users.insert(), user_id=2, user_name='user2')        xa.prepare()        xa.rollback()        xa = conn.begin_twophase()        conn.execute(users.insert(), user_id=3, user_name='user3')        xa.rollback()        xa = conn.begin_twophase()        conn.execute(users.insert(), user_id=4, user_name='user4')        xa.prepare()        xa.commit()        result = conn.execute(select([users.c.user_name]).order_by(users.c.user_id))        self.assertEqual(result.fetchall(), [('user1',),('user4',)])        conn.close()class AutoRollbackTest(TestBase):    def setUpAll(self):        global metadata        metadata = MetaData()    def tearDownAll(self):        metadata.drop_all(testing.db)    @testing.unsupported('sqlite')    def testrollback_deadlock(self):        """test that returning connections to the pool clears any object locks."""        conn1 = testing.db.connect()        conn2 = testing.db.connect()        users = Table('deadlock_users', metadata,            Column('user_id', INT, primary_key = True),            Column('user_name', VARCHAR(20)),            test_needs_acid=True,        )        users.create(conn1)        conn1.execute("select * from deadlock_users")        conn1.close()        # without auto-rollback in the connection pool's return() logic, this        # deadlocks in Postgres, because conn1 is returned to the pool but        # still has a lock on "deadlock_users".        # comment out the rollback in pool/ConnectionFairy._close() to see !        users.drop(conn2)        conn2.close()class ExplicitAutoCommitTest(TestBase):    """test the 'autocommit' flag on select() and text() objects.          Requires Postgres so that we may define a custom function which modifies the database.    """        __only_on__ = 'postgres'    def setUpAll(self):        global metadata, foo        metadata = MetaData(testing.db)        foo = Table('foo', metadata, Column('id', Integer, primary_key=True), Column('data', String(100)))        metadata.create_all()        testing.db.execute("create function insert_foo(varchar) returns integer as 'insert into foo(data) values ($1);select 1;' language sql")    def tearDown(self):        foo.delete().execute()            def tearDownAll(self):        testing.db.execute("drop function insert_foo(varchar)")        metadata.drop_all()        def test_control(self):        # test that not using autocommit does not commit         conn1 = testing.db.connect()        conn2 = testing.db.connect()        conn1.execute(select([func.insert_foo('data1')]))        assert conn2.execute(select([foo.c.data])).fetchall() == []        conn1.execute(text("select insert_foo('moredata')"))        assert conn2.execute(select([foo.c.data])).fetchall() == []        trans = conn1.begin()        trans.commit()        assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('moredata',)]                conn1.close()        conn2.close()            def test_explicit_compiled(self):        conn1 = testing.db.connect()        conn2 = testing.db.connect()                conn1.execute(select([func.insert_foo('data1')], autocommit=True))        assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',)]        conn1.execute(select([func.insert_foo('data2')]).autocommit())        assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('data2',)]                conn1.close()        conn2.close()        def test_explicit_text(self):        conn1 = testing.db.connect()        conn2 = testing.db.connect()                conn1.execute(text("select insert_foo('moredata')", autocommit=True))        assert conn2.execute(select([foo.c.data])).fetchall() == [('moredata',)]                conn1.close()        conn2.close()    def test_implicit_text(self):        conn1 = testing.db.connect()        conn2 = testing.db.connect()                conn1.execute(text("insert into foo (data) values ('implicitdata')"))        assert conn2.execute(select([foo.c.data])).fetchall() == [('implicitdata',)]                conn1.close()        conn2.close()            class TLTransactionTest(TestBase):    def setUpAll(self):        global users, metadata, tlengine        tlengine = create_engine(testing.db.url, strategy='threadlocal')        metadata = MetaData()        users = Table('query_users', metadata,            Column('user_id', INT, Sequence('query_users_id_seq', optional=True), primary_key=True),            Column('user_name', VARCHAR(20)),            test_needs_acid=True,        )        users.create(tlengine)    def tearDown(self):        tlengine.execute(users.delete())    def tearDownAll(self):        users.drop(tlengine)        tlengine.dispose()    def test_connection_close(self):        """test that when connections are closed for real, transactions are rolled back and disposed."""        c = tlengine.contextual_connect()        c.begin()        assert tlengine.session.in_transaction()        assert hasattr(tlengine.session, '_TLSession__transaction')        assert hasattr(tlengine.session, '_TLSession__trans')        c.close()        assert not tlengine.session.in_transaction()        assert not hasattr(tlengine.session, '_TLSession__transaction')        assert not hasattr(tlengine.session, '_TLSession__trans')    def test_transaction_close(self):        c = tlengine.contextual_connect()        t = c.begin()        tlengine.execute(users.insert(), user_id=1, user_name='user1')        tlengine.execute(users.insert(), user_id=2, user_name='user2')        t2 = c.begin()        tlengine.execute(users.insert(), user_id=3, user_name='user3')        tlengine.execute(users.insert(), user_id=4, user_name='user4')        t2.close()        result = c.execute("select * from query_users")        assert len(result.fetchall()) == 4        t.close()        external_connection = tlengine.connect()        result = external_connection.execute("select * from query_users")        try:            assert len(result.fetchall()) == 0        finally:            external_connection.close()    def testrollback(self):        """test a basic rollback"""        tlengine.begin()        tlengine.execute(users.insert(), user_id=1, user_name='user1')        tlengine.execute(users.insert(), user_id=2, user_name='user2')        tlengine.execute(users.insert(), user_id=3, user_name='user3')        tlengine.rollback()        external_connection = tlengine.connect()        result = external_connection.execute("select * from query_users")        try:            assert len(result.fetchall()) == 0        finally:            external_connection.close()    def testcommit(self):        """test a basic commit"""        tlengine.begin()        tlengine.execute(users.insert(), user_id=1, user_name='user1')        tlengine.execute(users.insert(), user_id=2, user_name='user2')        tlengine.execute(users.insert(), user_id=3, user_name='user3')        tlengine.commit()        external_connection = tlengine.connect()        result = external_connection.execute("select * from query_users")        try:            assert len(result.fetchall()) == 3        finally:            external_connection.close()    def testcommits(self):        assert tlengine.connect().execute("select count(1) from query_users").scalar() == 0        connection = tlengine.contextual_connect()        transaction = connection.begin()        connection.execute(users.insert(), user_id=1, user_name='user1')        transaction.commit()        transaction = connection.begin()        connection.execute(users.insert(), user_id=2, user_name='user2')        connection.execute(users.insert(), user_id=3, user_name='user3')        transaction.commit()        transaction = connection.begin()        result = connection.execute("select * from query_users")        l = result.fetchall()        assert len(l) == 3, "expected 3 got %d" % len(l)        transaction.commit()    def testrollback_off_conn(self):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -