⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
import testenv; testenv.configure_for_tests()from sqlalchemy import *from sqlalchemy import exceptions, utilfrom sqlalchemy.orm import *from sqlalchemy.orm.session import SessionExtensionfrom sqlalchemy.orm.session import Session as SessionClsfrom testlib import *from testlib.tables import *from testlib import fixtures, tablesimport pickleimport gcclass SessionTest(TestBase, AssertsExecutionResults):    def setUpAll(self):        tables.create()    def tearDownAll(self):        tables.drop()    def tearDown(self):        SessionCls.close_all()        tables.delete()        clear_mappers()    def setUp(self):        pass    def test_close(self):        """test that flush() doenst close a connection the session didnt open"""        c = testing.db.connect()        class User(object):pass        mapper(User, users)        s = create_session(bind=c)        s.save(User())        s.flush()        c.execute("select * from users")        u = User()        s.save(u)        s.user_name = 'some user'        s.flush()        u = User()        s.save(u)        s.user_name = 'some other user'        s.flush()    def test_close_two(self):        c = testing.db.connect()        try:            class User(object):pass            mapper(User, users)            s = create_session(bind=c)            s.begin()            tran = s.transaction            s.save(User())            s.flush()            c.execute("select * from users")            u = User()            s.save(u)            s.user_name = 'some user'            s.flush()            u = User()            s.save(u)            s.user_name = 'some other user'            s.flush()            assert s.transaction is tran            tran.close()        finally:            c.close()    def test_expunge_cascade(self):        tables.data()        mapper(Address, addresses)        mapper(User, users, properties={            'addresses':relation(Address, backref=backref("user", cascade="all"), cascade="all")        })        session = create_session()        u = session.query(User).filter_by(user_id=7).one()        # get everything to load in both directions        print [a.user for a in u.addresses]        # then see if expunge fails        session.expunge(u)    @engines.close_open_connections    def test_binds_from_expression(self):        """test that Session can extract Table objects from ClauseElements and match them to tables."""        Session = sessionmaker(binds={users:testing.db, addresses:testing.db})        sess = Session()        sess.execute(users.insert(), params=dict(user_id=1, user_name='ed'))        assert sess.execute(users.select()).fetchall() == [(1, 'ed')]        mapper(Address, addresses)        mapper(User, users, properties={            'addresses':relation(Address, backref=backref("user", cascade="all"), cascade="all")        })        Session = sessionmaker(binds={User:testing.db, Address:testing.db})        sess.execute(users.insert(), params=dict(user_id=2, user_name='fred'))        assert sess.execute(users.select()).fetchall() == [(1, 'ed'), (2, 'fred')]        sess.close()    @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang    @engines.close_open_connections    def test_transaction(self):        class User(object):pass        mapper(User, users)        conn1 = testing.db.connect()        conn2 = testing.db.connect()        sess = create_session(transactional=True, bind=conn1)        u = User()        sess.save(u)        sess.flush()        assert conn1.execute("select count(1) from users").scalar() == 1        assert conn2.execute("select count(1) from users").scalar() == 0        sess.commit()        assert conn1.execute("select count(1) from users").scalar() == 1        assert testing.db.connect().execute("select count(1) from users").scalar() == 1        sess.close()    @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang    @engines.close_open_connections    def test_autoflush(self):        class User(object):pass        mapper(User, users)        conn1 = testing.db.connect()        conn2 = testing.db.connect()        sess = create_session(bind=conn1, transactional=True, autoflush=True)        u = User()        u.user_name='ed'        sess.save(u)        u2 = sess.query(User).filter_by(user_name='ed').one()        assert u2 is u        assert conn1.execute("select count(1) from users").scalar() == 1        assert conn2.execute("select count(1) from users").scalar() == 0        sess.commit()        assert conn1.execute("select count(1) from users").scalar() == 1        assert testing.db.connect().execute("select count(1) from users").scalar() == 1        sess.close()    @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang    @engines.close_open_connections    def test_autoflush_unbound(self):        class User(object):pass        mapper(User, users)        try:            sess = create_session(transactional=True, autoflush=True)            u = User()            u.user_name='ed'            sess.save(u)            u2 = sess.query(User).filter_by(user_name='ed').one()            assert u2 is u            assert sess.execute("select count(1) from users", mapper=User).scalar() == 1            assert testing.db.connect().execute("select count(1) from users").scalar() == 0            sess.commit()            assert sess.execute("select count(1) from users", mapper=User).scalar() == 1            assert testing.db.connect().execute("select count(1) from users").scalar() == 1            sess.close()        except:            sess.rollback()            raise    @engines.close_open_connections    def test_autoflush_2(self):        class User(object):pass        mapper(User, users)        conn1 = testing.db.connect()        conn2 = testing.db.connect()        sess = create_session(bind=conn1, transactional=True, autoflush=True)        u = User()        u.user_name='ed'        sess.save(u)        sess.commit()        assert conn1.execute("select count(1) from users").scalar() == 1        assert testing.db.connect().execute("select count(1) from users").scalar() == 1        sess.commit()    # TODO: not doing rollback of attributes right now.    def dont_test_autoflush_rollback(self):        tables.data()        mapper(Address, addresses)        mapper(User, users, properties={            'addresses':relation(Address)        })        sess = create_session(transactional=True, autoflush=True)        u = sess.query(User).get(8)        newad = Address()        newad.email_address == 'something new'        u.addresses.append(newad)        u.user_name = 'some new name'        assert u.user_name == 'some new name'        assert len(u.addresses) == 4        assert newad in u.addresses        sess.rollback()        assert u.user_name == 'ed'        assert len(u.addresses) == 3        assert newad not in u.addresses    @engines.close_open_connections    def test_external_joined_transaction(self):        class User(object):pass        mapper(User, users)        conn = testing.db.connect()        trans = conn.begin()        sess = create_session(bind=conn, transactional=True, autoflush=True)        sess.begin()        u = User()        sess.save(u)        sess.flush()        sess.commit() # commit does nothing        trans.rollback() # rolls back        assert len(sess.query(User).all()) == 0        sess.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @engines.close_open_connections    def test_external_nested_transaction(self):        class User(object):pass        mapper(User, users)        try:            conn = testing.db.connect()            trans = conn.begin()            sess = create_session(bind=conn, transactional=True, autoflush=True)            u1 = User()            sess.save(u1)            sess.flush()            sess.begin_nested()            u2 = User()            sess.save(u2)            sess.flush()            sess.rollback()            trans.commit()            assert len(sess.query(User).all()) == 1        except:            conn.close()            raise    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @engines.close_open_connections    def test_heavy_nesting(self):        session = create_session(bind=testing.db)        session.begin()        session.connection().execute("insert into users (user_name) values ('user1')")        session.begin()        session.begin_nested()        session.connection().execute("insert into users (user_name) values ('user2')")        assert session.connection().execute("select count(1) from users").scalar() == 2        session.rollback()        assert session.connection().execute("select count(1) from users").scalar() == 1        session.connection().execute("insert into users (user_name) values ('user3')")        session.commit()        assert session.connection().execute("select count(1) from users").scalar() == 2    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @testing.exclude('mysql', '<', (5, 0, 3))    def test_twophase(self):        # TODO: mock up a failure condition here        # to ensure a rollback succeeds        class User(object):pass        class Address(object):pass        mapper(User, users)        mapper(Address, addresses)        engine2 = create_engine(testing.db.url)        sess = create_session(transactional=False, autoflush=False, twophase=True)        sess.bind_mapper(User, testing.db)        sess.bind_mapper(Address, engine2)        sess.begin()        u1 = User()        a1 = Address()        sess.save(u1)        sess.save(a1)        sess.commit()        sess.close()        engine2.dispose()        assert users.count().scalar() == 1        assert addresses.count().scalar() == 1    def test_joined_transaction(self):        class User(object):pass        mapper(User, users)        sess = create_session(transactional=True, autoflush=True)        sess.begin()        u = User()        sess.save(u)        sess.flush()        sess.commit() # commit does nothing        sess.rollback() # rolls back        assert len(sess.query(User).all()) == 0        sess.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @testing.exclude('mysql', '<', (5, 0, 3))    def test_nested_transaction(self):        class User(object):pass        mapper(User, users)        sess = create_session()        sess.begin()        u = User()        sess.save(u)        sess.flush()        sess.begin_nested()  # nested transaction        u2 = User()        sess.save(u2)        sess.flush()        sess.rollback()        sess.commit()        assert len(sess.query(User).all()) == 1        sess.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @testing.exclude('mysql', '<', (5, 0, 3))    def test_nested_autotrans(self):        class User(object):pass        mapper(User, users)        sess = create_session(transactional=True)        u = User()        sess.save(u)        sess.flush()        sess.begin_nested()  # nested transaction        u2 = User()        sess.save(u2)        sess.flush()        sess.rollback()        sess.commit()        assert len(sess.query(User).all()) == 1        sess.close()    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',                         'oracle', 'maxdb')    @testing.exclude('mysql', '<', (5, 0, 3))    def test_nested_transaction_connection_add(self):        class User(object): pass        mapper(User, users)                sess = create_session(transactional=False)                sess.begin()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -