⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transaction.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
        # test that a TLTransaction opened off a TLConnection allows that        # TLConnection to be aware of the transactional context        conn = tlengine.contextual_connect()        trans = conn.begin()        conn.execute(users.insert(), user_id=1, user_name='user1')        conn.execute(users.insert(), user_id=2, user_name='user2')        conn.execute(users.insert(), user_id=3, user_name='user3')        trans.rollback()        external_connection = tlengine.connect()        result = external_connection.execute("select * from query_users")        try:            assert len(result.fetchall()) == 0        finally:            external_connection.close()    def testmorerollback_off_conn(self):        # test that an existing TLConnection automatically takes place in a TLTransaction        # opened on a second TLConnection        conn = tlengine.contextual_connect()        conn2 = tlengine.contextual_connect()        trans = conn2.begin()        conn.execute(users.insert(), user_id=1, user_name='user1')        conn.execute(users.insert(), user_id=2, user_name='user2')        conn.execute(users.insert(), user_id=3, user_name='user3')        trans.rollback()        external_connection = tlengine.connect()        result = external_connection.execute("select * from query_users")        try:            assert len(result.fetchall()) == 0        finally:            external_connection.close()    def testcommit_off_conn(self):        conn = tlengine.contextual_connect()        trans = conn.begin()        conn.execute(users.insert(), user_id=1, user_name='user1')        conn.execute(users.insert(), user_id=2, user_name='user2')        conn.execute(users.insert(), user_id=3, user_name='user3')        trans.commit()        external_connection = tlengine.connect()        result = external_connection.execute("select * from query_users")        try:            assert len(result.fetchall()) == 3        finally:            external_connection.close()    @testing.unsupported('sqlite')    @testing.exclude('mysql', '<', (5, 0, 3))    def testnesting(self):        """tests nesting of transactions"""        external_connection = tlengine.connect()        self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)        tlengine.begin()        tlengine.execute(users.insert(), user_id=1, user_name='user1')        tlengine.execute(users.insert(), user_id=2, user_name='user2')        tlengine.execute(users.insert(), user_id=3, user_name='user3')        tlengine.begin()        tlengine.execute(users.insert(), user_id=4, user_name='user4')        tlengine.execute(users.insert(), user_id=5, user_name='user5')        tlengine.commit()        tlengine.rollback()        try:            self.assert_(external_connection.scalar("select count(1) from query_users") == 0)        finally:            external_connection.close()    @testing.exclude('mysql', '<', (5, 0, 3))    def testmixednesting(self):        """tests nesting of transactions off the TLEngine directly inside of        tranasctions off the connection from the TLEngine"""        external_connection = tlengine.connect()        self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)        conn = tlengine.contextual_connect()        trans = conn.begin()        trans2 = conn.begin()        tlengine.execute(users.insert(), user_id=1, user_name='user1')        tlengine.execute(users.insert(), user_id=2, user_name='user2')        tlengine.execute(users.insert(), user_id=3, user_name='user3')        tlengine.begin()        tlengine.execute(users.insert(), user_id=4, user_name='user4')        tlengine.begin()        tlengine.execute(users.insert(), user_id=5, user_name='user5')        tlengine.execute(users.insert(), user_id=6, user_name='user6')        tlengine.execute(users.insert(), user_id=7, user_name='user7')        tlengine.commit()        tlengine.execute(users.insert(), user_id=8, user_name='user8')        tlengine.commit()        trans2.commit()        trans.rollback()        conn.close()        try:            self.assert_(external_connection.scalar("select count(1) from query_users") == 0)        finally:            external_connection.close()    @testing.exclude('mysql', '<', (5, 0, 3))    def testmoremixednesting(self):        """tests nesting of transactions off the connection from the TLEngine        inside of tranasctions off thbe TLEngine directly."""        external_connection = tlengine.connect()        self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)        tlengine.begin()        connection = tlengine.contextual_connect()        connection.execute(users.insert(), user_id=1, user_name='user1')        tlengine.begin()        connection.execute(users.insert(), user_id=2, user_name='user2')        connection.execute(users.insert(), user_id=3, user_name='user3')        trans = connection.begin()        connection.execute(users.insert(), user_id=4, user_name='user4')        connection.execute(users.insert(), user_id=5, user_name='user5')        trans.commit()        tlengine.commit()        tlengine.rollback()        connection.close()        try:            self.assert_(external_connection.scalar("select count(1) from query_users") == 0)        finally:            external_connection.close()    @testing.exclude('mysql', '<', (5, 0, 3))    def testsessionnesting(self):        class User(object):            pass        try:            mapper(User, users)            sess = create_session(bind=tlengine)            tlengine.begin()            u = User()            sess.save(u)            sess.flush()            tlengine.commit()        finally:            clear_mappers()    def testconnections(self):        """tests that contextual_connect is threadlocal"""        c1 = tlengine.contextual_connect()        c2 = tlengine.contextual_connect()        assert c1.connection is c2.connection        c2.close()        assert c1.connection.connection is not Noneclass ForUpdateTest(TestBase):    def setUpAll(self):        global counters, metadata        metadata = MetaData()        counters = Table('forupdate_counters', metadata,            Column('counter_id', INT, primary_key = True),            Column('counter_value', INT),            test_needs_acid=True,        )        counters.create(testing.db)    def tearDown(self):        testing.db.connect().execute(counters.delete())    def tearDownAll(self):        counters.drop(testing.db)    def increment(self, count, errors, update_style=True, delay=0.005):        con = testing.db.connect()        sel = counters.select(for_update=update_style,                              whereclause=counters.c.counter_id==1)        for i in xrange(count):            trans = con.begin()            try:                existing = con.execute(sel).fetchone()                incr = existing['counter_value'] + 1                time.sleep(delay)                con.execute(counters.update(counters.c.counter_id==1,                                            values={'counter_value':incr}))                time.sleep(delay)                readback = con.execute(sel).fetchone()                if (readback['counter_value'] != incr):                    raise AssertionError("Got %s post-update, expected %s" %                                         (readback['counter_value'], incr))                trans.commit()            except Exception, e:                trans.rollback()                errors.append(e)                break        con.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')    def testqueued_update(self):        """Test SELECT FOR UPDATE with concurrent modifications.        Runs concurrent modifications on a single row in the users table,        with each mutator trying to increment a value stored in user_name.        """        db = testing.db        db.execute(counters.insert(), counter_id=1, counter_value=0)        iterations, thread_count = 10, 5        threads, errors = [], []        for i in xrange(thread_count):            thread = threading.Thread(target=self.increment,                                      args=(iterations,),                                      kwargs={'errors': errors,                                              'update_style': True})            thread.start()            threads.append(thread)        for thread in threads:            thread.join()        for e in errors:            sys.stderr.write("Failure: %s\n" % e)        self.assert_(len(errors) == 0)        sel = counters.select(whereclause=counters.c.counter_id==1)        final = db.execute(sel).fetchone()        self.assert_(final['counter_value'] == iterations * thread_count)    def overlap(self, ids, errors, update_style):        sel = counters.select(for_update=update_style,                              whereclause=counters.c.counter_id.in_(ids))        con = testing.db.connect()        trans = con.begin()        try:            rows = con.execute(sel).fetchall()            time.sleep(0.25)            trans.commit()        except Exception, e:            trans.rollback()            errors.append(e)        con.close()    def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):        db = testing.db        for cid in range(pool - 1):            db.execute(counters.insert(), counter_id=cid + 1, counter_value=0)        errors, threads = [], []        for i in xrange(thread_count):            thread = threading.Thread(target=self.overlap,                                      args=(groups.pop(0), errors, update_style))            thread.start()            threads.append(thread)        for thread in threads:            thread.join()        return errors    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')    def testqueued_select(self):        """Simple SELECT FOR UPDATE conflict test"""        errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)])        for e in errors:            sys.stderr.write("Failure: %s\n" % e)        self.assert_(len(errors) == 0)    @testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird',                         'sybase', 'access')    def testnowait_select(self):        """Simple SELECT FOR UPDATE NOWAIT conflict test"""        errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],                                        update_style='nowait')        self.assert_(len(errors) != 0)if __name__ == "__main__":    testenv.main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -