📄 transaction.py
字号:
import testenv; testenv.configure_for_tests()import sys, time, threadingfrom sqlalchemy import *from sqlalchemy.orm import *from testlib import *class TransactionTest(TestBase): def setUpAll(self): global users, metadata metadata = MetaData() users = Table('query_users', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), test_needs_acid=True, ) users.create(testing.db) def tearDown(self): testing.db.connect().execute(users.delete()) def tearDownAll(self): users.drop(testing.db) def testcommits(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.commit() transaction = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() transaction = connection.begin() result = connection.execute("select * from query_users") assert len(result.fetchall()) == 3 transaction.commit() def testrollback(self): """test a basic rollback""" connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() def testraise(self): connection = testing.db.connect() transaction = connection.begin() try: connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=1, user_name='user3') transaction.commit() assert False except Exception , e: print "Exception: ", e transaction.rollback() result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testnestedrollback(self): connection = testing.db.connect() try: transaction = connection.begin() try: connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() try: connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') raise Exception("uh oh") trans2.commit() except: trans2.rollback() raise transaction.rollback() except Exception, e: transaction.rollback() raise except Exception, e: try: assert str(e) == 'uh oh' # and not "This transaction is inactive" finally: connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testnesting(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') trans2.commit() transaction.rollback() self.assert_(connection.scalar("select count(1) from query_users") == 0) result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testclose(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') assert connection.in_transaction() trans2.close() assert connection.in_transaction() transaction.commit() assert not connection.in_transaction() self.assert_(connection.scalar("select count(1) from query_users") == 5) result = connection.execute("select * from query_users") assert len(result.fetchall()) == 5 connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testclose2(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') assert connection.in_transaction() trans2.close() assert connection.in_transaction() transaction.close() assert not connection.in_transaction() self.assert_(connection.scalar("select count(1) from query_users") == 0) result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') @testing.exclude('mysql', '<', (5, 0, 3)) def testnestedsubtransactionrollback(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans2 = connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans2.rollback() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(3,)] ) connection.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') @testing.exclude('mysql', '<', (5, 0, 3)) def testnestedsubtransactioncommit(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans2 = connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans2.commit() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(2,),(3,)] ) connection.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') @testing.exclude('mysql', '<', (5, 0, 3)) def testrollbacktosubtransaction(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans2 = connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans3 = connection.begin() connection.execute(users.insert(), user_id=3, user_name='user3') trans3.rollback() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.commit() self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(4,)] ) connection.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def testtwophasetransaction(self): connection = testing.db.connect() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() transaction.commit() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=2, user_name='user2') transaction.commit() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.prepare() transaction.rollback() self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(2,)] ) connection.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def testmixedtwophasetransaction(self): connection = testing.db.connect() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction2 = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') transaction3 = connection.begin_nested() connection.execute(users.insert(), user_id=3, user_name='user3') transaction4 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') transaction4.commit() transaction3.rollback() connection.execute(users.insert(), user_id=5, user_name='user5') transaction2.commit() transaction.prepare() transaction.commit()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -