⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cascade.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
import testenv; testenv.configure_for_tests()from sqlalchemy import *from sqlalchemy import exceptionsfrom sqlalchemy.orm import *from sqlalchemy.ext.sessioncontext import SessionContextfrom testlib import *import testlib.tables as tablesclass O2MCascadeTest(TestBase, AssertsExecutionResults):    def tearDown(self):        tables.delete()    def tearDownAll(self):        clear_mappers()        tables.drop()    def setUpAll(self):        global data        tables.create()        mapper(tables.User, tables.users, properties = dict(            address = relation(mapper(tables.Address, tables.addresses), lazy=True, uselist = False, cascade="all, delete-orphan"),            orders = relation(                mapper(tables.Order, tables.orders, properties = dict (                    items = relation(mapper(tables.Item, tables.orderitems), lazy=True, uselist =True, cascade="all, delete-orphan")                )),                lazy = True, uselist = True, cascade="all, delete-orphan")        ))    def setUp(self):        global data        data = [tables.User,            {'user_name' : 'ed',                'address' : (tables.Address, {'email_address' : 'foo@bar.com'}),                'orders' : (tables.Order, [                    {'description' : 'eds 1st order', 'items' : (tables.Item, [{'item_name' : 'eds o1 item'}, {'item_name' : 'eds other o1 item'}])},                    {'description' : 'eds 2nd order', 'items' : (tables.Item, [{'item_name' : 'eds o2 item'}, {'item_name' : 'eds other o2 item'}])}                 ])            },            {'user_name' : 'jack',                'address' : (tables.Address, {'email_address' : 'jack@jack.com'}),                'orders' : (tables.Order, [                    {'description' : 'jacks 1st order', 'items' : (tables.Item, [{'item_name' : 'im a lumberjack'}, {'item_name' : 'and im ok'}])}                 ])            },            {'user_name' : 'foo',                'address' : (tables.Address, {'email_address': 'hi@lala.com'}),                'orders' : (tables.Order, [                    {'description' : 'foo order', 'items' : (tables.Item, [])},                    {'description' : 'foo order 2', 'items' : (tables.Item, [{'item_name' : 'hi'}])},                    {'description' : 'foo order three', 'items' : (tables.Item, [{'item_name' : 'there'}])}                ])            }        ]        sess = create_session()        for elem in data[1:]:            u = tables.User()            sess.save(u)            u.user_name = elem['user_name']            u.address = tables.Address()            u.address.email_address = elem['address'][1]['email_address']            u.orders = []            for order in elem['orders'][1]:                o = tables.Order()                o.isopen = None                o.description = order['description']                u.orders.append(o)                o.items = []                for item in order['items'][1]:                    i = tables.Item()                    i.item_name = item['item_name']                    o.items.append(i)        sess.flush()        sess.clear()    def testassignlist(self):        sess = create_session()        u = tables.User()        u.user_name = 'jack'        o1 = tables.Order()        o1.description ='someorder'        o2 = tables.Order()        o2.description = 'someotherorder'        l = [o1, o2]        sess.save(u)        u.orders = l        assert o1 in sess        assert o2 in sess        sess.flush()        sess.clear()        u = sess.query(tables.User).get(u.user_id)        o3 = tables.Order()        o3.description='order3'        o4 = tables.Order()        o4.description = 'order4'        u.orders = [o3, o4]        assert o3 in sess        assert o4 in sess        sess.flush()        o5 = tables.Order()        o5.description='order5'        sess.save(o5)        try:            sess.flush()            assert False        except exceptions.FlushError, e:            assert "is an orphan" in str(e)    def testdelete(self):        sess = create_session()        l = sess.query(tables.User).all()        for u in l:            print repr(u.orders)        self.assert_result(l, data[0], *data[1:])        ids = (l[0].user_id, l[2].user_id)        sess.delete(l[0])        sess.delete(l[2])        sess.flush()        assert tables.orders.count(tables.orders.c.user_id.in_(ids)).scalar() == 0        assert tables.orderitems.count(tables.orders.c.user_id.in_(ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0        assert tables.addresses.count(tables.addresses.c.user_id.in_(ids)).scalar() == 0        assert tables.users.count(tables.users.c.user_id.in_(ids)).scalar() == 0    def testdelete2(self):        """test that unloaded collections are still included in a delete-cascade by default."""        sess = create_session()        u = sess.query(tables.User).filter_by(user_name='ed').one()        # assert 'addresses' collection not loaded        assert 'addresses' not in u.__dict__        sess.delete(u)        sess.flush()        assert tables.addresses.count(tables.addresses.c.email_address=='foo@bar.com').scalar() == 0        assert tables.orderitems.count(tables.orderitems.c.item_name.like('eds%')).scalar() == 0    def testcascadecollection(self):        """test that cascade only reaches instances that are still part of the collection,        not those that have been removed"""        sess = create_session()        u = tables.User()        u.user_name = 'newuser'        o = tables.Order()        o.description = "some description"        u.orders.append(o)        sess.save(u)        sess.flush()        u.orders.remove(o)        sess.delete(u)        assert u in sess.deleted        assert o not in sess.deleted    def testorphan(self):        sess = create_session()        l = sess.query(tables.User).all()        jack = l[1]        jack.orders[:] = []        ids = [jack.user_id]        self.assert_(tables.orders.count(tables.orders.c.user_id.in_(ids)).scalar() == 1)        self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 2)        sess.flush()        self.assert_(tables.orders.count(tables.orders.c.user_id.in_(ids)).scalar() == 0)        self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0)class M2OCascadeTest(TestBase, AssertsExecutionResults):    def tearDown(self):        ctx.current.clear()        for t in metadata.table_iterator(reverse=True):            t.delete().execute()    def tearDownAll(self):        clear_mappers()        metadata.drop_all()    @testing.uses_deprecated('SessionContext')    def setUpAll(self):        global ctx, data, metadata, User, Pref, Extra        ctx = SessionContext(create_session)        metadata = MetaData(testing.db)        extra = Table("extra", metadata,            Column("extra_id", Integer, Sequence("extra_id_seq", optional=True), primary_key=True),            Column("prefs_id", Integer, ForeignKey("prefs.prefs_id"))        )        prefs = Table('prefs', metadata,            Column('prefs_id', Integer, Sequence('prefs_id_seq', optional=True), primary_key=True),            Column('prefs_data', String(40)))        users = Table('users', metadata,            Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True),            Column('user_name', String(40)),            Column('pref_id', Integer, ForeignKey('prefs.prefs_id'))        )        class User(object):            def __init__(self, name):                self.user_name = name        class Pref(object):            def __init__(self, data):                self.prefs_data = data        class Extra(object):            pass        metadata.create_all()        mapper(Extra, extra)        mapper(Pref, prefs, properties=dict(            extra = relation(Extra, cascade="all, delete")        ))        mapper(User, users, properties = dict(            pref = relation(Pref, lazy=False, cascade="all, delete-orphan")        ))    def setUp(self):        u1 = User("ed")        u1.pref = Pref("pref 1")        u2 = User("jack")        u2.pref = Pref("pref 2")        u3 = User("foo")        u3.pref = Pref("pref 3")        u1.pref.extra.append(Extra())        u2.pref.extra.append(Extra())        u2.pref.extra.append(Extra())        ctx.current.save(u1)        ctx.current.save(u2)        ctx.current.save(u3)        ctx.current.flush()        ctx.current.clear()    @testing.fails_on('maxdb')    def testorphan(self):        jack = ctx.current.query(User).filter_by(user_name='jack').one()        p = jack.pref        e = jack.pref.extra[0]        jack.pref = None        ctx.current.flush()        assert p not in ctx.current        assert e not in ctx.current    @testing.fails_on('maxdb')    def testorphan2(self):        jack = ctx.current.query(User).filter_by(user_name='jack').one()        p = jack.pref        e = jack.pref.extra[0]        ctx.current.clear()        jack.pref = None        ctx.current.update(jack)        ctx.current.update(p)        ctx.current.update(e)        assert p in ctx.current        assert e in ctx.current        ctx.current.flush()        assert p not in ctx.current        assert e not in ctx.current    def testorphan3(self):        """test that double assignment doesn't accidentally reset the 'parent' flag."""        jack = ctx.current.query(User).filter_by(user_name='jack').one()        newpref = Pref("newpref")        jack.pref = newpref        jack.pref = newpref        ctx.current.flush()class M2MCascadeTest(TestBase, AssertsExecutionResults):    def setUpAll(self):        global metadata, a, b, atob        metadata = MetaData(testing.db)        a = Table('a', metadata,            Column('id', Integer, primary_key=True),            Column('data', String(30))            )        b = Table('b', metadata,            Column('id', Integer, primary_key=True),            Column('data', String(30))            )        atob = Table('atob', metadata,            Column('aid', Integer, ForeignKey('a.id')),            Column('bid', Integer, ForeignKey('b.id'))            )        metadata.create_all()    def tearDownAll(self):        metadata.drop_all()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -