📄 transaction.py
字号:
# test that a TLTransaction opened off a TLConnection allows that # TLConnection to be aware of the transactional context conn = tlengine.contextual_connect() trans = conn.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.rollback() external_connection = tlengine.connect() result = external_connection.execute("select * from query_users") try: assert len(result.fetchall()) == 0 finally: external_connection.close() def testmorerollback_off_conn(self): # test that an existing TLConnection automatically takes place in a TLTransaction # opened on a second TLConnection conn = tlengine.contextual_connect() conn2 = tlengine.contextual_connect() trans = conn2.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.rollback() external_connection = tlengine.connect() result = external_connection.execute("select * from query_users") try: assert len(result.fetchall()) == 0 finally: external_connection.close() def testcommit_off_conn(self): conn = tlengine.contextual_connect() trans = conn.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.commit() external_connection = tlengine.connect() result = external_connection.execute("select * from query_users") try: assert len(result.fetchall()) == 3 finally: external_connection.close() @testing.unsupported('sqlite') @testing.exclude('mysql', '<', (5, 0, 3)) def testnesting(self): """tests nesting of transactions""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.begin() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.execute(users.insert(), user_id=5, user_name='user5') tlengine.commit() tlengine.rollback() try: self.assert_(external_connection.scalar("select count(1) from query_users") == 0) finally: external_connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testmixednesting(self): """tests nesting of transactions off the TLEngine directly inside of tranasctions off the connection from the TLEngine""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) conn = tlengine.contextual_connect() trans = conn.begin() trans2 = conn.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.begin() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.begin() tlengine.execute(users.insert(), user_id=5, user_name='user5') tlengine.execute(users.insert(), user_id=6, user_name='user6') tlengine.execute(users.insert(), user_id=7, user_name='user7') tlengine.commit() tlengine.execute(users.insert(), user_id=8, user_name='user8') tlengine.commit() trans2.commit() trans.rollback() conn.close() try: self.assert_(external_connection.scalar("select count(1) from query_users") == 0) finally: external_connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testmoremixednesting(self): """tests nesting of transactions off the connection from the TLEngine inside of tranasctions off thbe TLEngine directly.""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) tlengine.begin() connection = tlengine.contextual_connect() connection.execute(users.insert(), user_id=1, user_name='user1') tlengine.begin() connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') trans.commit() tlengine.commit() tlengine.rollback() connection.close() try: self.assert_(external_connection.scalar("select count(1) from query_users") == 0) finally: external_connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) def testsessionnesting(self): class User(object): pass try: mapper(User, users) sess = create_session(bind=tlengine) tlengine.begin() u = User() sess.save(u) sess.flush() tlengine.commit() finally: clear_mappers() def testconnections(self): """tests that contextual_connect is threadlocal""" c1 = tlengine.contextual_connect() c2 = tlengine.contextual_connect() assert c1.connection is c2.connection c2.close() assert c1.connection.connection is not Noneclass ForUpdateTest(TestBase): def setUpAll(self): global counters, metadata metadata = MetaData() counters = Table('forupdate_counters', metadata, Column('counter_id', INT, primary_key = True), Column('counter_value', INT), test_needs_acid=True, ) counters.create(testing.db) def tearDown(self): testing.db.connect().execute(counters.delete()) def tearDownAll(self): counters.drop(testing.db) def increment(self, count, errors, update_style=True, delay=0.005): con = testing.db.connect() sel = counters.select(for_update=update_style, whereclause=counters.c.counter_id==1) for i in xrange(count): trans = con.begin() try: existing = con.execute(sel).fetchone() incr = existing['counter_value'] + 1 time.sleep(delay) con.execute(counters.update(counters.c.counter_id==1, values={'counter_value':incr})) time.sleep(delay) readback = con.execute(sel).fetchone() if (readback['counter_value'] != incr): raise AssertionError("Got %s post-update, expected %s" % (readback['counter_value'], incr)) trans.commit() except Exception, e: trans.rollback() errors.append(e) break con.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') def testqueued_update(self): """Test SELECT FOR UPDATE with concurrent modifications. Runs concurrent modifications on a single row in the users table, with each mutator trying to increment a value stored in user_name. """ db = testing.db db.execute(counters.insert(), counter_id=1, counter_value=0) iterations, thread_count = 10, 5 threads, errors = [], [] for i in xrange(thread_count): thread = threading.Thread(target=self.increment, args=(iterations,), kwargs={'errors': errors, 'update_style': True}) thread.start() threads.append(thread) for thread in threads: thread.join() for e in errors: sys.stderr.write("Failure: %s\n" % e) self.assert_(len(errors) == 0) sel = counters.select(whereclause=counters.c.counter_id==1) final = db.execute(sel).fetchone() self.assert_(final['counter_value'] == iterations * thread_count) def overlap(self, ids, errors, update_style): sel = counters.select(for_update=update_style, whereclause=counters.c.counter_id.in_(ids)) con = testing.db.connect() trans = con.begin() try: rows = con.execute(sel).fetchall() time.sleep(0.25) trans.commit() except Exception, e: trans.rollback() errors.append(e) con.close() def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5): db = testing.db for cid in range(pool - 1): db.execute(counters.insert(), counter_id=cid + 1, counter_value=0) errors, threads = [], [] for i in xrange(thread_count): thread = threading.Thread(target=self.overlap, args=(groups.pop(0), errors, update_style)) thread.start() threads.append(thread) for thread in threads: thread.join() return errors @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') def testqueued_select(self): """Simple SELECT FOR UPDATE conflict test""" errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)]) for e in errors: sys.stderr.write("Failure: %s\n" % e) self.assert_(len(errors) == 0) @testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird', 'sybase', 'access') def testnowait_select(self): """Simple SELECT FOR UPDATE NOWAIT conflict test""" errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)], update_style='nowait') self.assert_(len(errors) != 0)if __name__ == "__main__": testenv.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -