📄 session.py
字号:
import testenv; testenv.configure_for_tests()from sqlalchemy import *from sqlalchemy import exceptions, utilfrom sqlalchemy.orm import *from sqlalchemy.orm.session import SessionExtensionfrom sqlalchemy.orm.session import Session as SessionClsfrom testlib import *from testlib.tables import *from testlib import fixtures, tablesimport pickleimport gcclass SessionTest(TestBase, AssertsExecutionResults): def setUpAll(self): tables.create() def tearDownAll(self): tables.drop() def tearDown(self): SessionCls.close_all() tables.delete() clear_mappers() def setUp(self): pass def test_close(self): """test that flush() doenst close a connection the session didnt open""" c = testing.db.connect() class User(object):pass mapper(User, users) s = create_session(bind=c) s.save(User()) s.flush() c.execute("select * from users") u = User() s.save(u) s.user_name = 'some user' s.flush() u = User() s.save(u) s.user_name = 'some other user' s.flush() def test_close_two(self): c = testing.db.connect() try: class User(object):pass mapper(User, users) s = create_session(bind=c) s.begin() tran = s.transaction s.save(User()) s.flush() c.execute("select * from users") u = User() s.save(u) s.user_name = 'some user' s.flush() u = User() s.save(u) s.user_name = 'some other user' s.flush() assert s.transaction is tran tran.close() finally: c.close() def test_expunge_cascade(self): tables.data() mapper(Address, addresses) mapper(User, users, properties={ 'addresses':relation(Address, backref=backref("user", cascade="all"), cascade="all") }) session = create_session() u = session.query(User).filter_by(user_id=7).one() # get everything to load in both directions print [a.user for a in u.addresses] # then see if expunge fails session.expunge(u) @engines.close_open_connections def test_binds_from_expression(self): """test that Session can extract Table objects from ClauseElements and match them to tables.""" Session = sessionmaker(binds={users:testing.db, addresses:testing.db}) sess = Session() sess.execute(users.insert(), params=dict(user_id=1, user_name='ed')) assert sess.execute(users.select()).fetchall() == [(1, 'ed')] mapper(Address, addresses) mapper(User, users, properties={ 'addresses':relation(Address, backref=backref("user", cascade="all"), cascade="all") }) Session = sessionmaker(binds={User:testing.db, Address:testing.db}) sess.execute(users.insert(), params=dict(user_id=2, user_name='fred')) assert sess.execute(users.select()).fetchall() == [(1, 'ed'), (2, 'fred')] sess.close() @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang @engines.close_open_connections def test_transaction(self): class User(object):pass mapper(User, users) conn1 = testing.db.connect() conn2 = testing.db.connect() sess = create_session(transactional=True, bind=conn1) u = User() sess.save(u) sess.flush() assert conn1.execute("select count(1) from users").scalar() == 1 assert conn2.execute("select count(1) from users").scalar() == 0 sess.commit() assert conn1.execute("select count(1) from users").scalar() == 1 assert testing.db.connect().execute("select count(1) from users").scalar() == 1 sess.close() @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang @engines.close_open_connections def test_autoflush(self): class User(object):pass mapper(User, users) conn1 = testing.db.connect() conn2 = testing.db.connect() sess = create_session(bind=conn1, transactional=True, autoflush=True) u = User() u.user_name='ed' sess.save(u) u2 = sess.query(User).filter_by(user_name='ed').one() assert u2 is u assert conn1.execute("select count(1) from users").scalar() == 1 assert conn2.execute("select count(1) from users").scalar() == 0 sess.commit() assert conn1.execute("select count(1) from users").scalar() == 1 assert testing.db.connect().execute("select count(1) from users").scalar() == 1 sess.close() @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang @engines.close_open_connections def test_autoflush_unbound(self): class User(object):pass mapper(User, users) try: sess = create_session(transactional=True, autoflush=True) u = User() u.user_name='ed' sess.save(u) u2 = sess.query(User).filter_by(user_name='ed').one() assert u2 is u assert sess.execute("select count(1) from users", mapper=User).scalar() == 1 assert testing.db.connect().execute("select count(1) from users").scalar() == 0 sess.commit() assert sess.execute("select count(1) from users", mapper=User).scalar() == 1 assert testing.db.connect().execute("select count(1) from users").scalar() == 1 sess.close() except: sess.rollback() raise @engines.close_open_connections def test_autoflush_2(self): class User(object):pass mapper(User, users) conn1 = testing.db.connect() conn2 = testing.db.connect() sess = create_session(bind=conn1, transactional=True, autoflush=True) u = User() u.user_name='ed' sess.save(u) sess.commit() assert conn1.execute("select count(1) from users").scalar() == 1 assert testing.db.connect().execute("select count(1) from users").scalar() == 1 sess.commit() # TODO: not doing rollback of attributes right now. def dont_test_autoflush_rollback(self): tables.data() mapper(Address, addresses) mapper(User, users, properties={ 'addresses':relation(Address) }) sess = create_session(transactional=True, autoflush=True) u = sess.query(User).get(8) newad = Address() newad.email_address == 'something new' u.addresses.append(newad) u.user_name = 'some new name' assert u.user_name == 'some new name' assert len(u.addresses) == 4 assert newad in u.addresses sess.rollback() assert u.user_name == 'ed' assert len(u.addresses) == 3 assert newad not in u.addresses @engines.close_open_connections def test_external_joined_transaction(self): class User(object):pass mapper(User, users) conn = testing.db.connect() trans = conn.begin() sess = create_session(bind=conn, transactional=True, autoflush=True) sess.begin() u = User() sess.save(u) sess.flush() sess.commit() # commit does nothing trans.rollback() # rolls back assert len(sess.query(User).all()) == 0 sess.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @engines.close_open_connections def test_external_nested_transaction(self): class User(object):pass mapper(User, users) try: conn = testing.db.connect() trans = conn.begin() sess = create_session(bind=conn, transactional=True, autoflush=True) u1 = User() sess.save(u1) sess.flush() sess.begin_nested() u2 = User() sess.save(u2) sess.flush() sess.rollback() trans.commit() assert len(sess.query(User).all()) == 1 except: conn.close() raise @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @engines.close_open_connections def test_heavy_nesting(self): session = create_session(bind=testing.db) session.begin() session.connection().execute("insert into users (user_name) values ('user1')") session.begin() session.begin_nested() session.connection().execute("insert into users (user_name) values ('user2')") assert session.connection().execute("select count(1) from users").scalar() == 2 session.rollback() assert session.connection().execute("select count(1) from users").scalar() == 1 session.connection().execute("insert into users (user_name) values ('user3')") session.commit() assert session.connection().execute("select count(1) from users").scalar() == 2 @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_twophase(self): # TODO: mock up a failure condition here # to ensure a rollback succeeds class User(object):pass class Address(object):pass mapper(User, users) mapper(Address, addresses) engine2 = create_engine(testing.db.url) sess = create_session(transactional=False, autoflush=False, twophase=True) sess.bind_mapper(User, testing.db) sess.bind_mapper(Address, engine2) sess.begin() u1 = User() a1 = Address() sess.save(u1) sess.save(a1) sess.commit() sess.close() engine2.dispose() assert users.count().scalar() == 1 assert addresses.count().scalar() == 1 def test_joined_transaction(self): class User(object):pass mapper(User, users) sess = create_session(transactional=True, autoflush=True) sess.begin() u = User() sess.save(u) sess.flush() sess.commit() # commit does nothing sess.rollback() # rolls back assert len(sess.query(User).all()) == 0 sess.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_nested_transaction(self): class User(object):pass mapper(User, users) sess = create_session() sess.begin() u = User() sess.save(u) sess.flush() sess.begin_nested() # nested transaction u2 = User() sess.save(u2) sess.flush() sess.rollback() sess.commit() assert len(sess.query(User).all()) == 1 sess.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_nested_autotrans(self): class User(object):pass mapper(User, users) sess = create_session(transactional=True) u = User() sess.save(u) sess.flush() sess.begin_nested() # nested transaction u2 = User() sess.save(u2) sess.flush() sess.rollback() sess.commit() assert len(sess.query(User).all()) == 1 sess.close() @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_nested_transaction_connection_add(self): class User(object): pass mapper(User, users) sess = create_session(transactional=False) sess.begin()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -