📄 unitofwork.py
字号:
] objects = [] for elem in data[1:]: item = Item() objects.append(item) item.item_name = elem['item_name'] item.keywords = [] if elem['keywords'][1]: klist = Session.query(keywordmapper).filter(keywords.c.name.in_([e['name'] for e in elem['keywords'][1]])) else: klist = [] khash = {} for k in klist: khash[k.name] = k for kname in [e['name'] for e in elem['keywords'][1]]: try: k = khash[kname] except KeyError: k = Keyword() k.name = kname item.keywords.append(k) Session.commit() l = Session.query(m).filter(items.c.item_name.in_([e['item_name'] for e in data[1:]])).order_by(items.c.item_name).all() self.assert_result(l, *data) objects[4].item_name = 'item4updated' k = Keyword() k.name = 'yellow' objects[5].keywords.append(k) self.assert_sql(testing.db, lambda:Session.commit(), [ { "UPDATE items SET item_name=:item_name WHERE items.item_id = :items_item_id": {'item_name': 'item4updated', 'items_item_id': objects[4].item_id} , "INSERT INTO keywords (name) VALUES (:name)": {'name': 'yellow'} }, ("INSERT INTO itemkeywords (item_id, keyword_id) VALUES (:item_id, :keyword_id)", lambda ctx: [{'item_id': objects[5].item_id, 'keyword_id': k.keyword_id}] ) ], with_sequences = [ { "UPDATE items SET item_name=:item_name WHERE items.item_id = :items_item_id": {'item_name': 'item4updated', 'items_item_id': objects[4].item_id} , "INSERT INTO keywords (keyword_id, name) VALUES (:keyword_id, :name)": lambda ctx: {'name': 'yellow', 'keyword_id':ctx.last_inserted_ids()[0]} }, ("INSERT INTO itemkeywords (item_id, keyword_id) VALUES (:item_id, :keyword_id)", lambda ctx: [{'item_id': objects[5].item_id, 'keyword_id': k.keyword_id}] ) ] ) objects[2].keywords.append(k) dkid = objects[5].keywords[1].keyword_id del objects[5].keywords[1] self.assert_sql(testing.db, lambda:Session.commit(), [ ( "DELETE FROM itemkeywords WHERE itemkeywords.item_id = :item_id AND itemkeywords.keyword_id = :keyword_id", [{'item_id': objects[5].item_id, 'keyword_id': dkid}] ), ( "INSERT INTO itemkeywords (item_id, keyword_id) VALUES (:item_id, :keyword_id)", lambda ctx: [{'item_id': objects[2].item_id, 'keyword_id': k.keyword_id}] ) ]) Session.delete(objects[3]) Session.commit() def test_manytomany_remove(self): """tests that setting a list-based attribute to '[]' properly affects the history and allows the many-to-many rows to be deleted""" keywordmapper = mapper(Keyword, keywords) m = mapper(Item, orderitems, properties = dict( keywords = relation(keywordmapper, itemkeywords, lazy = False), )) i = Item() k1 = Keyword() k2 = Keyword() i.keywords.append(k1) i.keywords.append(k2) Session.commit() assert itemkeywords.count().scalar() == 2 i.keywords = [] Session.commit() assert itemkeywords.count().scalar() == 0 def test_scalar(self): """test that dependency.py doesnt try to delete an m2m relation referencing None.""" mapper(Keyword, keywords) mapper(Item, orderitems, properties = dict( keyword = relation(Keyword, secondary=itemkeywords, uselist=False), )) i = Item() Session.commit() Session.delete(i) Session.commit() def test_manytomany_update(self): """tests some history operations on a many to many""" class Keyword(object): def __init__(self, name): self.name = name def __eq__(self, other): return other.__class__ == Keyword and other.name == self.name def __repr__(self): return "Keyword(%s, %s)" % (getattr(self, 'keyword_id', 'None'), self.name) mapper(Keyword, keywords) mapper(Item, orderitems, properties = dict( keywords = relation(Keyword, secondary=itemkeywords, lazy=False, order_by=keywords.c.name), )) (k1, k2, k3) = (Keyword('keyword 1'), Keyword('keyword 2'), Keyword('keyword 3')) item = Item() item.item_name = 'item 1' item.keywords.append(k1) item.keywords.append(k2) item.keywords.append(k3) Session.commit() item.keywords = [] item.keywords.append(k1) item.keywords.append(k2) Session.commit() Session.close() item = Session.query(Item).get(item.item_id) print [k1, k2] print item.keywords assert item.keywords == [k1, k2] def test_association(self): """basic test of an association object""" class IKAssociation(object): def __repr__(self): return "\nIKAssociation " + repr(self.item_id) + " " + repr(self.keyword) items = orderitems keywordmapper = mapper(Keyword, keywords) # note that we are breaking a rule here, making a second mapper(Keyword, keywords) # the reorganization of mapper construction affected this, but was fixed again m = mapper(Item, items, properties = dict( keywords = relation(mapper(IKAssociation, itemkeywords, properties = dict( keyword = relation(mapper(Keyword, keywords, non_primary=True), lazy = False, uselist = False, order_by=keywords.c.name) ), primary_key = [itemkeywords.c.item_id, itemkeywords.c.keyword_id]), lazy = False) )) data = [Item, {'item_name': 'a_item1', 'keywords' : (IKAssociation, [ {'keyword' : (Keyword, {'name': 'big'})}, {'keyword' : (Keyword, {'name': 'green'})}, {'keyword' : (Keyword, {'name': 'purple'})}, {'keyword' : (Keyword, {'name': 'round'})} ] ) }, {'item_name': 'a_item2', 'keywords' : (IKAssociation, [ {'keyword' : (Keyword, {'name': 'huge'})}, {'keyword' : (Keyword, {'name': 'violet'})}, {'keyword' : (Keyword, {'name': 'yellow'})} ] ) }, {'item_name': 'a_item3', 'keywords' : (IKAssociation, [ {'keyword' : (Keyword, {'name': 'big'})}, {'keyword' : (Keyword, {'name': 'blue'})}, ] ) } ] for elem in data[1:]: item = Item() item.item_name = elem['item_name'] item.keywords = [] for kname in [e['keyword'][1]['name'] for e in elem['keywords'][1]]: try: k = Keyword.query.filter(keywords.c.name == kname)[0] except IndexError: k = Keyword() k.name= kname ik = IKAssociation() ik.keyword = k item.keywords.append(ik) Session.commit() Session.close() l = Item.query.filter(items.c.item_name.in_([e['item_name'] for e in data[1:]])).order_by(items.c.item_name).all() self.assert_result(l, *data)class SaveTest2(ORMTest): def define_tables(self, metadata): global users, addresses users = Table('users', metadata, Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True), Column('user_name', String(20)), ) addresses = Table('email_addresses', metadata, Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True), Column('rel_user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(20)), ) def test_m2o_nonmatch(self): m = mapper(Address, addresses, properties = dict( user = relation(mapper(User, users), lazy = True, uselist = False) )) data = [ {'user_name' : 'thesub' , 'email_address' : 'bar@foo.com'}, {'user_name' : 'assdkfj' , 'email_address' : 'thesdf@asdf.com'}, ] objects = [] for elem in data: a = Address() a.email_address = elem['email_address'] a.user = User() a.user.user_name = elem['user_name'] objects.append(a) self.assert_sql(testing.db, lambda: Session.commit(), [ ( "INSERT INTO users (user_name) VALUES (:user_name)", {'user_name': 'thesub'} ), ( "INSERT INTO users (user_name) VALUES (:user_name)", {'user_name': 'assdkfj'} ), ( "INSERT INTO email_addresses (rel_user_id, email_address) VALUES (:rel_user_id, :email_address)", {'rel_user_id': 1, 'email_address': 'bar@foo.com'} ), ( "INSERT INTO email_addresses (rel_user_id, email_address) VALUES (:rel_user_id, :email_address)", {'rel_user_id': 2, 'email_address': 'thesdf@asdf.com'} ) ], with_sequences = [ ( "INSERT INTO users (user_id, user_name) VALUES (:user_id, :user_name)", lambda ctx: {'user_name': 'thesub', 'user_id':ctx.last_inserted_ids()[0]} ), ( "INSERT INTO users (user_id, user_name) VALUES (:user_id, :user_name)", lambda ctx: {'user_name': 'assdkfj', 'user_id':ctx.last_inserted_ids()[0]} ), ( "INSERT INTO email_addresses (address_id, rel_user_id, email_address) VALUES (:address_id, :rel_user_id, :email_address)", lambda ctx:{'rel_user_id': 1, 'email_address': 'bar@foo.com', 'address_id':ctx.last_inserted_ids()[0]} ), ( "INSERT INTO email_addresses (address_id, rel_user_id, email_address) VALUES (:address_id, :rel_user_id, :email_address)", lambda ctx:{'rel_user_id': 2, 'email_address': 'thesdf@asdf.com', 'address_id':ctx.last_inserted_ids()[0]} ) ] )class SaveTest3(ORMTest): def define_tables(self, metadata): global t1, t2, t3 t1 = Table('items', metadata, Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True), Column('item_name', VARCHAR(50)), ) t3 = Table('keywords', metadata, Column('keyword_id', Integer, Sequence('keyword_id_seq', optional=True), primary_key = True), Column('name', VARCHAR(50)), ) t2 = Table('assoc', metadata, Column('item_id', INT, ForeignKey("items")), Column('keyword_id', INT, ForeignKey("keywords")), Column('foo', Boolean, default=True) ) def test_manytomany_xtracol_delete(self): """test that a many-to-many on a table that has an extra column can properly delete rows from the table without referencing the extra column""" mapper(Keyword, t3) mapper(Item, t1, properties = dict( keywords = relation(Keyword, secondary=t2, lazy = False), )) i = Item() k1 = Keyword() k2 = Keyword() i.keywords.append(k1) i.keywords.append(k2) Session.commit() assert t2.count().scalar() == 2 i.keywords = [] print i.keywords Session.commit() assert t2.count().scalar() == 0class BooleanColTest(ORMTest): def define_tables(self, metadata): global t t =Table('t1', metadata, Column('id', Integer, primary_key=True), Column('name', String(30)), Column('value', Boolean)) def test_boolean(self): # use the regular mapper from sqlalchemy.orm import mapper class T(fixtures.Base): pass mapper(T, t) sess = create_session() t1 = T(value=True, name="t1") t2 = T(value=False, name="t2") t3 = T(value=True, name="t3") sess.save(t1) sess.save(t2) sess.save(t3) sess.flush() for clear in (False, True): if clear: sess.clear() self.assertEquals(sess.query(T).all(), [T(value=True, name="t1"), T(value=False, name="t2"), T(value=True, name="t3")]) if clear: sess.clear() self.assertEquals(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")]) if clear: sess.clear() self.assertEquals(sess.query(T).filter(T.value==False).all(), [T(value=False, name="t2")]) t2 = sess.query(T).get(t2.id) t2.value = True sess.flush() self.assertEquals(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"), T(value=True, name="t2"), T(value=True, name="t3")]) t2.value = False sess.flush() self.assertEquals(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -