⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 attributes.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
import testenv; testenv.configure_for_tests()import pickleimport sqlalchemy.orm.attributes as attributesfrom sqlalchemy.orm.collections import collectionfrom sqlalchemy import exceptionsfrom testlib import *from testlib import fixturesROLLBACK_SUPPORTED=False# these test classes defined at the module# level to support picklingclass MyTest(object):passclass MyTest2(object):passclass AttributesTest(TestBase):    def test_basic(self):        class User(object):pass        attributes.register_class(User)        attributes.register_attribute(User, 'user_id', uselist = False, useobject=False)        attributes.register_attribute(User, 'user_name', uselist = False, useobject=False)        attributes.register_attribute(User, 'email_address', uselist = False, useobject=False)        u = User()        u.user_id = 7        u.user_name = 'john'        u.email_address = 'lala@123.com'        self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')        u._state.commit_all()        self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')        u.user_name = 'heythere'        u.email_address = 'foo@bar.com'        self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.email_address == 'foo@bar.com')    def test_pickleness(self):        attributes.register_class(MyTest)        attributes.register_class(MyTest2)        attributes.register_attribute(MyTest, 'user_id', uselist = False, useobject=False)        attributes.register_attribute(MyTest, 'user_name', uselist = False, useobject=False)        attributes.register_attribute(MyTest, 'email_address', uselist = False, useobject=False)        attributes.register_attribute(MyTest2, 'a', uselist = False, useobject=False)        attributes.register_attribute(MyTest2, 'b', uselist = False, useobject=False)        # shouldnt be pickling callables at the class level        def somecallable(*args):            return None        attr_name = 'mt2'        attributes.register_attribute(MyTest, attr_name, uselist = True, trackparent=True, callable_=somecallable, useobject=True)        o = MyTest()        o.mt2.append(MyTest2())        o.user_id=7        o.mt2[0].a = 'abcde'        pk_o = pickle.dumps(o)        o2 = pickle.loads(pk_o)        pk_o2 = pickle.dumps(o2)        # so... pickle is creating a new 'mt2' string after a roundtrip here,        # so we'll brute-force set it to be id-equal to the original string        if False:            o_mt2_str = [ k for k in o.__dict__ if k == 'mt2'][0]            o2_mt2_str = [ k for k in o2.__dict__ if k == 'mt2'][0]            self.assert_(o_mt2_str == o2_mt2_str)            self.assert_(o_mt2_str is not o2_mt2_str)            # change the id of o2.__dict__['mt2']            former = o2.__dict__['mt2']            del o2.__dict__['mt2']            o2.__dict__[o_mt2_str] = former            self.assert_(pk_o == pk_o2)        # the above is kind of distrurbing, so let's do it again a little        # differently.  the string-id in serialization thing is just an        # artifact of pickling that comes up in the first round-trip.        # a -> b differs in pickle memoization of 'mt2', but b -> c will        # serialize identically.        o3 = pickle.loads(pk_o2)        pk_o3 = pickle.dumps(o3)        o4 = pickle.loads(pk_o3)        pk_o4 = pickle.dumps(o4)        self.assert_(pk_o3 == pk_o4)        # and lastly make sure we still have our data after all that.        # identical serialzation is great, *if* it's complete :)        self.assert_(o4.user_id == 7)        self.assert_(o4.user_name is None)        self.assert_(o4.email_address is None)        self.assert_(len(o4.mt2) == 1)        self.assert_(o4.mt2[0].a == 'abcde')        self.assert_(o4.mt2[0].b is None)    def test_deferred(self):        class Foo(object):pass        data = {'a':'this is a', 'b':12}        def loader(instance, keys):            for k in keys:                instance.__dict__[k] = data[k]            return attributes.ATTR_WAS_SET        attributes.register_class(Foo, deferred_scalar_loader=loader)        attributes.register_attribute(Foo, 'a', uselist=False, useobject=False)        attributes.register_attribute(Foo, 'b', uselist=False, useobject=False)        f = Foo()        f._state.expire_attributes(None)        self.assertEquals(f.a, "this is a")        self.assertEquals(f.b, 12)        f.a = "this is some new a"        f._state.expire_attributes(None)        self.assertEquals(f.a, "this is a")        self.assertEquals(f.b, 12)        f._state.expire_attributes(None)        f.a = "this is another new a"        self.assertEquals(f.a, "this is another new a")        self.assertEquals(f.b, 12)        f._state.expire_attributes(None)        self.assertEquals(f.a, "this is a")        self.assertEquals(f.b, 12)        del f.a        self.assertEquals(f.a, None)        self.assertEquals(f.b, 12)        f._state.commit_all()        self.assertEquals(f.a, None)        self.assertEquals(f.b, 12)    def test_deferred_pickleable(self):        data = {'a':'this is a', 'b':12}        def loader(instance, keys):            for k in keys:                instance.__dict__[k] = data[k]            return attributes.ATTR_WAS_SET        attributes.register_class(MyTest, deferred_scalar_loader=loader)        attributes.register_attribute(MyTest, 'a', uselist=False, useobject=False)        attributes.register_attribute(MyTest, 'b', uselist=False, useobject=False)        m = MyTest()        m._state.expire_attributes(None)        assert 'a' not in m.__dict__        m2 = pickle.loads(pickle.dumps(m))        assert 'a' not in m2.__dict__        self.assertEquals(m2.a, "this is a")        self.assertEquals(m2.b, 12)    def test_list(self):        class User(object):pass        class Address(object):pass        attributes.register_class(User)        attributes.register_class(Address)        attributes.register_attribute(User, 'user_id', uselist = False, useobject=False)        attributes.register_attribute(User, 'user_name', uselist = False, useobject=False)        attributes.register_attribute(User, 'addresses', uselist = True, useobject=True)        attributes.register_attribute(Address, 'address_id', uselist = False, useobject=False)        attributes.register_attribute(Address, 'email_address', uselist = False, useobject=False)        u = User()        u.user_id = 7        u.user_name = 'john'        u.addresses = []        a = Address()        a.address_id = 10        a.email_address = 'lala@123.com'        u.addresses.append(a)        self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')        u, a._state.commit_all()        self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')        u.user_name = 'heythere'        a = Address()        a.address_id = 11        a.email_address = 'foo@bar.com'        u.addresses.append(a)        self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.addresses[0].email_address == 'lala@123.com' and u.addresses[1].email_address == 'foo@bar.com')    def test_lazytrackparent(self):        """test that the "hasparent" flag works properly when lazy loaders and backrefs are used"""        class Post(object):pass        class Blog(object):pass        attributes.register_class(Post)        attributes.register_class(Blog)        # set up instrumented attributes with backrefs        attributes.register_attribute(Post, 'blog', uselist=False, extension=attributes.GenericBackrefExtension('posts'), trackparent=True, useobject=True)        attributes.register_attribute(Blog, 'posts', uselist=True, extension=attributes.GenericBackrefExtension('blog'), trackparent=True, useobject=True)        # create objects as if they'd been freshly loaded from the database (without history)        b = Blog()        p1 = Post()        b._state.set_callable('posts', lambda:[p1])        p1._state.set_callable('blog', lambda:b)        p1, b._state.commit_all()        # no orphans (called before the lazy loaders fire off)        assert attributes.has_parent(Blog, p1, 'posts', optimistic=True)        assert attributes.has_parent(Post, b, 'blog', optimistic=True)        # assert connections        assert p1.blog is b        assert p1 in b.posts        # manual connections        b2 = Blog()        p2 = Post()        b2.posts.append(p2)        assert attributes.has_parent(Blog, p2, 'posts')        assert attributes.has_parent(Post, b2, 'blog')    def test_inheritance(self):        """tests that attributes are polymorphic"""        class Foo(object):pass        class Bar(Foo):pass        attributes.register_class(Foo)        attributes.register_class(Bar)        def func1():            print "func1"            return "this is the foo attr"        def func2():            print "func2"            return "this is the bar attr"        def func3():            print "func3"            return "this is the shared attr"        attributes.register_attribute(Foo, 'element', uselist=False, callable_=lambda o:func1, useobject=True)        attributes.register_attribute(Foo, 'element2', uselist=False, callable_=lambda o:func3, useobject=True)        attributes.register_attribute(Bar, 'element', uselist=False, callable_=lambda o:func2, useobject=True)        x = Foo()        y = Bar()        assert x.element == 'this is the foo attr'        assert y.element == 'this is the bar attr'        assert x.element2 == 'this is the shared attr'        assert y.element2 == 'this is the shared attr'    def test_no_double_state(self):        states = set()        class Foo(object):            def __init__(self):                states.add(self._state)        class Bar(Foo):            def __init__(self):                states.add(self._state)                Foo.__init__(self)        attributes.register_class(Foo)        attributes.register_class(Bar)        b = Bar()        self.assertEquals(len(states), 1)        self.assertEquals(list(states)[0].obj(), b)    def test_inheritance2(self):        """test that the attribute manager can properly traverse the managed attributes of an object,        if the object is of a descendant class with managed attributes in the parent class"""        class Foo(object):pass        class Bar(Foo):pass        attributes.register_class(Foo)        attributes.register_class(Bar)        attributes.register_attribute(Foo, 'element', uselist=False, useobject=True)        x = Bar()        x.element = 'this is the element'        self.assertEquals(attributes.get_history(x._state, 'element'), (['this is the element'],[], []))        x._state.commit_all()        (added, unchanged, deleted) = attributes.get_history(x._state, 'element')        assert added == []        assert unchanged == ['this is the element']    def test_lazyhistory(self):        """tests that history functions work with lazy-loading attributes"""        class Foo(fixtures.Base):            pass        class Bar(fixtures.Base):            pass        attributes.register_class(Foo)        attributes.register_class(Bar)        bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)]        def func1():            return "this is func 1"        def func2():            return [bar1, bar2, bar3]        attributes.register_attribute(Foo, 'col1', uselist=False, callable_=lambda o:func1, useobject=True)        attributes.register_attribute(Foo, 'col2', uselist=True, callable_=lambda o:func2, useobject=True)        attributes.register_attribute(Bar, 'id', uselist=False, useobject=True)        x = Foo()        x._state.commit_all()        x.col2.append(bar4)        self.assertEquals(attributes.get_history(x._state, 'col2'), ([bar4], [bar1, bar2, bar3], []))    def test_parenttrack(self):        class Foo(object):pass        class Bar(object):pass        attributes.register_class(Foo)        attributes.register_class(Bar)        attributes.register_attribute(Foo, 'element', uselist=False, trackparent=True, useobject=True)        attributes.register_attribute(Bar, 'element', uselist=False, trackparent=True, useobject=True)        f1 = Foo()        f2 = Foo()        b1 = Bar()        b2 = Bar()        f1.element = b1        b2.element = f2        assert attributes.has_parent(Foo, b1, 'element')        assert not attributes.has_parent(Foo, b2, 'element')        assert not attributes.has_parent(Foo, f2, 'element')        assert attributes.has_parent(Bar, f2, 'element')        b2.element = None        assert not attributes.has_parent(Bar, f2, 'element')        # test that double assignment doesn't accidentally reset the 'parent' flag.        b3 = Bar()        f4 = Foo()        b3.element = f4        assert attributes.has_parent(Bar, f4, 'element')        b3.element = f4        assert attributes.has_parent(Bar, f4, 'element')    def test_mutablescalars(self):        """test detection of changes on mutable scalar items"""        class Foo(object):pass        attributes.register_class(Foo)        attributes.register_attribute(Foo, 'element', uselist=False, copy_function=lambda x:[y for y in x], mutable_scalars=True, useobject=False)        x = Foo()        x.element = ['one', 'two', 'three']        x._state.commit_all()        x.element[1] = 'five'        assert x._state.is_modified()        attributes.unregister_class(Foo)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -