📄 attributes.py
字号:
import testenv; testenv.configure_for_tests()import pickleimport sqlalchemy.orm.attributes as attributesfrom sqlalchemy.orm.collections import collectionfrom sqlalchemy import exceptionsfrom testlib import *from testlib import fixturesROLLBACK_SUPPORTED=False# these test classes defined at the module# level to support picklingclass MyTest(object):passclass MyTest2(object):passclass AttributesTest(TestBase): def test_basic(self): class User(object):pass attributes.register_class(User) attributes.register_attribute(User, 'user_id', uselist = False, useobject=False) attributes.register_attribute(User, 'user_name', uselist = False, useobject=False) attributes.register_attribute(User, 'email_address', uselist = False, useobject=False) u = User() u.user_id = 7 u.user_name = 'john' u.email_address = 'lala@123.com' self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com') u._state.commit_all() self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com') u.user_name = 'heythere' u.email_address = 'foo@bar.com' self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.email_address == 'foo@bar.com') def test_pickleness(self): attributes.register_class(MyTest) attributes.register_class(MyTest2) attributes.register_attribute(MyTest, 'user_id', uselist = False, useobject=False) attributes.register_attribute(MyTest, 'user_name', uselist = False, useobject=False) attributes.register_attribute(MyTest, 'email_address', uselist = False, useobject=False) attributes.register_attribute(MyTest2, 'a', uselist = False, useobject=False) attributes.register_attribute(MyTest2, 'b', uselist = False, useobject=False) # shouldnt be pickling callables at the class level def somecallable(*args): return None attr_name = 'mt2' attributes.register_attribute(MyTest, attr_name, uselist = True, trackparent=True, callable_=somecallable, useobject=True) o = MyTest() o.mt2.append(MyTest2()) o.user_id=7 o.mt2[0].a = 'abcde' pk_o = pickle.dumps(o) o2 = pickle.loads(pk_o) pk_o2 = pickle.dumps(o2) # so... pickle is creating a new 'mt2' string after a roundtrip here, # so we'll brute-force set it to be id-equal to the original string if False: o_mt2_str = [ k for k in o.__dict__ if k == 'mt2'][0] o2_mt2_str = [ k for k in o2.__dict__ if k == 'mt2'][0] self.assert_(o_mt2_str == o2_mt2_str) self.assert_(o_mt2_str is not o2_mt2_str) # change the id of o2.__dict__['mt2'] former = o2.__dict__['mt2'] del o2.__dict__['mt2'] o2.__dict__[o_mt2_str] = former self.assert_(pk_o == pk_o2) # the above is kind of distrurbing, so let's do it again a little # differently. the string-id in serialization thing is just an # artifact of pickling that comes up in the first round-trip. # a -> b differs in pickle memoization of 'mt2', but b -> c will # serialize identically. o3 = pickle.loads(pk_o2) pk_o3 = pickle.dumps(o3) o4 = pickle.loads(pk_o3) pk_o4 = pickle.dumps(o4) self.assert_(pk_o3 == pk_o4) # and lastly make sure we still have our data after all that. # identical serialzation is great, *if* it's complete :) self.assert_(o4.user_id == 7) self.assert_(o4.user_name is None) self.assert_(o4.email_address is None) self.assert_(len(o4.mt2) == 1) self.assert_(o4.mt2[0].a == 'abcde') self.assert_(o4.mt2[0].b is None) def test_deferred(self): class Foo(object):pass data = {'a':'this is a', 'b':12} def loader(instance, keys): for k in keys: instance.__dict__[k] = data[k] return attributes.ATTR_WAS_SET attributes.register_class(Foo, deferred_scalar_loader=loader) attributes.register_attribute(Foo, 'a', uselist=False, useobject=False) attributes.register_attribute(Foo, 'b', uselist=False, useobject=False) f = Foo() f._state.expire_attributes(None) self.assertEquals(f.a, "this is a") self.assertEquals(f.b, 12) f.a = "this is some new a" f._state.expire_attributes(None) self.assertEquals(f.a, "this is a") self.assertEquals(f.b, 12) f._state.expire_attributes(None) f.a = "this is another new a" self.assertEquals(f.a, "this is another new a") self.assertEquals(f.b, 12) f._state.expire_attributes(None) self.assertEquals(f.a, "this is a") self.assertEquals(f.b, 12) del f.a self.assertEquals(f.a, None) self.assertEquals(f.b, 12) f._state.commit_all() self.assertEquals(f.a, None) self.assertEquals(f.b, 12) def test_deferred_pickleable(self): data = {'a':'this is a', 'b':12} def loader(instance, keys): for k in keys: instance.__dict__[k] = data[k] return attributes.ATTR_WAS_SET attributes.register_class(MyTest, deferred_scalar_loader=loader) attributes.register_attribute(MyTest, 'a', uselist=False, useobject=False) attributes.register_attribute(MyTest, 'b', uselist=False, useobject=False) m = MyTest() m._state.expire_attributes(None) assert 'a' not in m.__dict__ m2 = pickle.loads(pickle.dumps(m)) assert 'a' not in m2.__dict__ self.assertEquals(m2.a, "this is a") self.assertEquals(m2.b, 12) def test_list(self): class User(object):pass class Address(object):pass attributes.register_class(User) attributes.register_class(Address) attributes.register_attribute(User, 'user_id', uselist = False, useobject=False) attributes.register_attribute(User, 'user_name', uselist = False, useobject=False) attributes.register_attribute(User, 'addresses', uselist = True, useobject=True) attributes.register_attribute(Address, 'address_id', uselist = False, useobject=False) attributes.register_attribute(Address, 'email_address', uselist = False, useobject=False) u = User() u.user_id = 7 u.user_name = 'john' u.addresses = [] a = Address() a.address_id = 10 a.email_address = 'lala@123.com' u.addresses.append(a) self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com') u, a._state.commit_all() self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com') u.user_name = 'heythere' a = Address() a.address_id = 11 a.email_address = 'foo@bar.com' u.addresses.append(a) self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.addresses[0].email_address == 'lala@123.com' and u.addresses[1].email_address == 'foo@bar.com') def test_lazytrackparent(self): """test that the "hasparent" flag works properly when lazy loaders and backrefs are used""" class Post(object):pass class Blog(object):pass attributes.register_class(Post) attributes.register_class(Blog) # set up instrumented attributes with backrefs attributes.register_attribute(Post, 'blog', uselist=False, extension=attributes.GenericBackrefExtension('posts'), trackparent=True, useobject=True) attributes.register_attribute(Blog, 'posts', uselist=True, extension=attributes.GenericBackrefExtension('blog'), trackparent=True, useobject=True) # create objects as if they'd been freshly loaded from the database (without history) b = Blog() p1 = Post() b._state.set_callable('posts', lambda:[p1]) p1._state.set_callable('blog', lambda:b) p1, b._state.commit_all() # no orphans (called before the lazy loaders fire off) assert attributes.has_parent(Blog, p1, 'posts', optimistic=True) assert attributes.has_parent(Post, b, 'blog', optimistic=True) # assert connections assert p1.blog is b assert p1 in b.posts # manual connections b2 = Blog() p2 = Post() b2.posts.append(p2) assert attributes.has_parent(Blog, p2, 'posts') assert attributes.has_parent(Post, b2, 'blog') def test_inheritance(self): """tests that attributes are polymorphic""" class Foo(object):pass class Bar(Foo):pass attributes.register_class(Foo) attributes.register_class(Bar) def func1(): print "func1" return "this is the foo attr" def func2(): print "func2" return "this is the bar attr" def func3(): print "func3" return "this is the shared attr" attributes.register_attribute(Foo, 'element', uselist=False, callable_=lambda o:func1, useobject=True) attributes.register_attribute(Foo, 'element2', uselist=False, callable_=lambda o:func3, useobject=True) attributes.register_attribute(Bar, 'element', uselist=False, callable_=lambda o:func2, useobject=True) x = Foo() y = Bar() assert x.element == 'this is the foo attr' assert y.element == 'this is the bar attr' assert x.element2 == 'this is the shared attr' assert y.element2 == 'this is the shared attr' def test_no_double_state(self): states = set() class Foo(object): def __init__(self): states.add(self._state) class Bar(Foo): def __init__(self): states.add(self._state) Foo.__init__(self) attributes.register_class(Foo) attributes.register_class(Bar) b = Bar() self.assertEquals(len(states), 1) self.assertEquals(list(states)[0].obj(), b) def test_inheritance2(self): """test that the attribute manager can properly traverse the managed attributes of an object, if the object is of a descendant class with managed attributes in the parent class""" class Foo(object):pass class Bar(Foo):pass attributes.register_class(Foo) attributes.register_class(Bar) attributes.register_attribute(Foo, 'element', uselist=False, useobject=True) x = Bar() x.element = 'this is the element' self.assertEquals(attributes.get_history(x._state, 'element'), (['this is the element'],[], [])) x._state.commit_all() (added, unchanged, deleted) = attributes.get_history(x._state, 'element') assert added == [] assert unchanged == ['this is the element'] def test_lazyhistory(self): """tests that history functions work with lazy-loading attributes""" class Foo(fixtures.Base): pass class Bar(fixtures.Base): pass attributes.register_class(Foo) attributes.register_class(Bar) bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)] def func1(): return "this is func 1" def func2(): return [bar1, bar2, bar3] attributes.register_attribute(Foo, 'col1', uselist=False, callable_=lambda o:func1, useobject=True) attributes.register_attribute(Foo, 'col2', uselist=True, callable_=lambda o:func2, useobject=True) attributes.register_attribute(Bar, 'id', uselist=False, useobject=True) x = Foo() x._state.commit_all() x.col2.append(bar4) self.assertEquals(attributes.get_history(x._state, 'col2'), ([bar4], [bar1, bar2, bar3], [])) def test_parenttrack(self): class Foo(object):pass class Bar(object):pass attributes.register_class(Foo) attributes.register_class(Bar) attributes.register_attribute(Foo, 'element', uselist=False, trackparent=True, useobject=True) attributes.register_attribute(Bar, 'element', uselist=False, trackparent=True, useobject=True) f1 = Foo() f2 = Foo() b1 = Bar() b2 = Bar() f1.element = b1 b2.element = f2 assert attributes.has_parent(Foo, b1, 'element') assert not attributes.has_parent(Foo, b2, 'element') assert not attributes.has_parent(Foo, f2, 'element') assert attributes.has_parent(Bar, f2, 'element') b2.element = None assert not attributes.has_parent(Bar, f2, 'element') # test that double assignment doesn't accidentally reset the 'parent' flag. b3 = Bar() f4 = Foo() b3.element = f4 assert attributes.has_parent(Bar, f4, 'element') b3.element = f4 assert attributes.has_parent(Bar, f4, 'element') def test_mutablescalars(self): """test detection of changes on mutable scalar items""" class Foo(object):pass attributes.register_class(Foo) attributes.register_attribute(Foo, 'element', uselist=False, copy_function=lambda x:[y for y in x], mutable_scalars=True, useobject=False) x = Foo() x.element = ['one', 'two', 'three'] x._state.commit_all() x.element[1] = 'five' assert x._state.is_modified() attributes.unregister_class(Foo)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -