📄 references.py
字号:
## Copyright (c) 2006, 2007 Canonical## Written by Gustavo Niemeyer <gustavo@niemeyer.net>## This file is part of Storm Object Relational Mapper.## Storm is free software; you can redistribute it and/or modify# it under the terms of the GNU Lesser General Public License as# published by the Free Software Foundation; either version 2.1 of# the License, or (at your option) any later version.## Storm is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public License# along with this program. If not, see <http://www.gnu.org/licenses/>.#from storm.exceptions import WrongStoreError, NoStoreError, ClassInfoErrorfrom storm.store import Store, get_where_for_argsfrom storm.expr import ( Select, Column, Exists, ComparableExpr, LeftJoin, Undef, SQLRaw, compare_columns, compile)from storm.info import *__all__ = ["Reference", "ReferenceSet", "Proxy"]class Reference(object): """Descriptor for one-to-one relationships.""" def __init__(self, local_key, remote_key, on_remote=False): # Reference internals are public to the Proxy. self._local_key = local_key self._remote_key = remote_key self._on_remote = on_remote self._relation = None self._cls = None def __get__(self, local, cls=None): if local is None: if self._cls is None: # Must set earlier, since __eq__() has no access # to the used class. self._cls = _find_descriptor_class(cls, self) return self if self._relation is None: # Don't use local.__class__ here, as it might be security # proxied or something. # XXX UNTESTED! self._build_relation(get_obj_info(local).cls_info.cls) remote = self._relation.get_remote(local) if remote is not None: return remote store = Store.of(local) if store is None: return None if self._relation.remote_key_is_primary: remote = store.get(self._relation.remote_cls, self._relation.get_local_variables(local)) else: where = self._relation.get_where_for_remote(local) result = store.find(self._relation.remote_cls, where) remote = result.one() if remote is not None: self._relation.link(local, remote) return remote def __set__(self, local, remote): if self._relation is None: # Don't use local.__class__ here, as it might be security # proxied or something. # XXX UNTESTED! self._build_relation(get_obj_info(local).cls_info.cls) if remote is None: remote = self._relation.get_remote(local) if remote is not None: self._relation.unlink(get_obj_info(local), get_obj_info(remote), True) else: self._relation.link(local, remote, True) def _build_relation(self, used_cls=None): if self._cls is None: assert used_cls is not None self._cls = _find_descriptor_class(used_cls, self) resolver = PropertyResolver(self, self._cls) self._local_key = resolver.resolve(self._local_key) self._remote_key = resolver.resolve(self._remote_key) self._relation = Relation(self._local_key, self._remote_key, False, self._on_remote) def __eq__(self, other): if self._relation is None: self._build_relation() return self._relation.get_where_for_local(other)class ReferenceSet(object): def __init__(self, local_key1, remote_key1, remote_key2=None, local_key2=None, order_by=None): self._local_key1 = local_key1 self._remote_key1 = remote_key1 self._remote_key2 = remote_key2 self._local_key2 = local_key2 self._order_by = order_by self._relation1 = None self._relation2 = None def __get__(self, local, cls=None): if local is None: return self if self._relation1 is None: # Don't use local.__class__ here, as it might be security # proxied or something. # XXX UNTESTED! self._build_relations(get_obj_info(local).cls_info.cls) #store = Store.of(local) #if store is None: # return None if self._relation2 is None: return BoundReferenceSet(self._relation1, local, self._order_by) else: return BoundIndirectReferenceSet(self._relation1, self._relation2, local, self._order_by) def _build_relations(self, used_cls): resolver = PropertyResolver(self, used_cls) self._local_key1 = resolver.resolve(self._local_key1) self._remote_key1 = resolver.resolve(self._remote_key1) self._relation1 = Relation(self._local_key1, self._remote_key1, True, True) if self._local_key2 and self._remote_key2: self._local_key2 = resolver.resolve(self._local_key2) self._remote_key2 = resolver.resolve(self._remote_key2) self._relation2 = Relation(self._local_key2, self._remote_key2, True, True)class BoundReferenceSetBase(object): def __iter__(self): return self.find().__iter__() def first(self, *args, **kwargs): return self.find(*args, **kwargs).first() def last(self, *args, **kwargs): return self.find(*args, **kwargs).last() def any(self, *args, **kwargs): return self.find(*args, **kwargs).any() def one(self, *args, **kwargs): return self.find(*args, **kwargs).one() def values(self, *columns): return self.find().values(*columns) def order_by(self, *args): return self.find().order_by(*args) def count(self): return self.find().count()class BoundReferenceSet(BoundReferenceSetBase): def __init__(self, relation, local, order_by): self._relation = relation self._local = local self._target_cls = self._relation.remote_cls self._order_by = order_by def find(self, *args, **kwargs): store = Store.of(self._local) if store is None: raise NoStoreError("Can't perform operation without a store") where = self._relation.get_where_for_remote(self._local) result = store.find(self._target_cls, where, *args, **kwargs) if self._order_by is not None: result.order_by(self._order_by) return result def clear(self, *args, **kwargs): set_kwargs = {} for remote_column in self._relation.remote_key: set_kwargs[remote_column.name] = None store = Store.of(self._local) if store is None: raise NoStoreError("Can't perform operation without a store") where = self._relation.get_where_for_remote(self._local) store.find(self._target_cls, where, *args, **kwargs).set(**set_kwargs) def add(self, remote): self._relation.link(self._local, remote, True) def remove(self, remote): self._relation.unlink(get_obj_info(self._local), get_obj_info(remote), True)class BoundIndirectReferenceSet(BoundReferenceSetBase): def __init__(self, relation1, relation2, local, order_by): self._relation1 = relation1 self._relation2 = relation2 self._local = local self._order_by = order_by self._target_cls = relation2.local_cls self._link_cls = relation1.remote_cls def find(self, *args, **kwargs): store = Store.of(self._local) if store is None: raise NoStoreError("Can't perform operation without a store") where = (self._relation1.get_where_for_remote(self._local) & self._relation2.get_where_for_join()) result = store.find(self._target_cls, where, *args, **kwargs) if self._order_by is not None: result.order_by(self._order_by) return result def clear(self, *args, **kwargs): store = Store.of(self._local) if store is None: raise NoStoreError("Can't perform operation without a store") where = self._relation1.get_where_for_remote(self._local) if args or kwargs: filter = get_where_for_args(args, kwargs, self._target_cls) join = self._relation2.get_where_for_join() table = get_cls_info(self._target_cls).table where &= Exists(Select(SQLRaw("*"), join & filter, tables=table)) store.find(self._link_cls, where).remove() def add(self, remote): link = self._link_cls() self._relation1.link(self._local, link, True) # Don't use remote here, as it might be security # proxied or something. # XXX UNTESTED! self._relation2.link(get_obj_info(remote).get_obj(), link, True) def remove(self, remote): store = Store.of(self._local) if store is None: raise NoStoreError("Can't perform operation without a store") where = (self._relation1.get_where_for_remote(self._local) & self._relation2.get_where_for_remote(remote)) store.find(self._link_cls, where).remove()class Proxy(ComparableExpr): """Proxy exposes a referred object's column as a local column. For example: class Foo(object): bar_id = Int() bar = Reference(bar_id, Bar.id) bar_title = Proxy(bar, Bar.title) For most uses, Foo.bar_title should behave as if it were a native property of Foo. """ class RemoteProp(object): """ This descriptor will resolve and set the _remote_prop attribute when it's first used. It avoids having a test at every single place where the attribute is touched. """ def __get__(self, obj, cls=None): resolver = PropertyResolver(obj, obj._cls) obj._remote_prop = resolver.resolve_one(obj._unresolved_prop) return obj._remote_prop _remote_prop = RemoteProp() def __init__(self, reference, remote_prop): self._reference = reference self._unresolved_prop = remote_prop self._cls = None def __get__(self, obj, cls=None): if self._cls is None: self._cls = _find_descriptor_class(cls, self) if obj is None: return self # Have you counted how many descriptors we're dealing with here? ;-) return self._remote_prop.__get__(self._reference.__get__(obj)) def __set__(self, obj, value): return self._remote_prop.__set__(self._reference.__get__(obj), value) @property def variable_factory(self): return self._remote_prop.variable_factory@compile.when(Proxy)def compile_proxy(compile, state, proxy): # References build the relation lazily so that they don't immediately # try to resolve string properties. Unfortunately we have to check that # here as well and make sure that at this point it's actually there. # Maybe this should use the same trick as _remote_prop on Proxy if proxy._reference._relation is None: proxy._reference._build_relation() # Inject the join between the table of the class holding the proxy # and the table of the class which is the target of the reference. left_join = LeftJoin(proxy._remote_prop.table, proxy._reference._relation.get_where_for_join()) state.auto_tables.append(left_join) # And compile the remote property normally. return compile(state, proxy._remote_prop)class Relation(object): def __init__(self, local_key, remote_key, many, on_remote): assert type(local_key) is tuple and type(remote_key) is tuple self.local_key = local_key self.remote_key = remote_key self.local_cls = getattr(self.local_key[0], "cls", None) self.remote_cls = self.remote_key[0].cls self.remote_key_is_primary = False primary_key = get_cls_info(self.remote_cls).primary_key if len(primary_key) == len(self.remote_key): for column1, column2 in zip(self.remote_key, primary_key): if column1.name != column2.name: break else: self.remote_key_is_primary = True self.many = many self.on_remote = on_remote # XXX These should probably be weak dictionaries. self._local_columns = {} self._remote_columns = {} self._l_to_r = {} self._r_to_l = {} def get_remote(self, local): return get_obj_info(local).get(self) def get_where_for_remote(self, local): """Generate a column comparison expression for reference properties. The returned expression may be used to find objects of the I{remote} type referring to C{local}. """ local_variables = self.get_local_variables(local) for variable in local_variables: if not variable.is_defined(): Store.of(local).flush()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -