📄 properties.py
字号:
# properties.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""MapperProperty implementations.This is a private module which defines the behavior ofinvidual ORM-mapped attributes."""from sqlalchemy import sql, schema, util, exceptions, loggingfrom sqlalchemy.sql.util import ClauseAdapter, ColumnsInClausefrom sqlalchemy.sql import visitors, operators, ColumnElementfrom sqlalchemy.orm import mapper, sync, strategies, attributes, dependency, object_mapperfrom sqlalchemy.orm import session as sessionlibfrom sqlalchemy.orm.util import CascadeOptionsfrom sqlalchemy.orm.interfaces import StrategizedProperty, PropComparator, MapperPropertyfrom sqlalchemy.exceptions import ArgumentErrorimport weakref__all__ = ('ColumnProperty', 'CompositeProperty', 'SynonymProperty', 'PropertyLoader', 'BackRef')class ColumnProperty(StrategizedProperty): """Describes an object attribute that corresponds to a table column.""" def __init__(self, *columns, **kwargs): """The list of `columns` describes a single object property. If there are multiple tables joined together for the mapper, this list represents the equivalent column as it appears across each table. """ self.columns = list(columns) self.group = kwargs.pop('group', None) self.deferred = kwargs.pop('deferred', False) self.comparator = ColumnProperty.ColumnComparator(self) # sanity check for col in columns: if not isinstance(col, ColumnElement): raise ArgumentError('column_property() must be given a ColumnElement as its argument. Try .label() or .as_scalar() for Selectables to fix this.') def create_strategy(self): if self.deferred: return strategies.DeferredColumnLoader(self) else: return strategies.ColumnLoader(self) def do_init(self): super(ColumnProperty, self).do_init() if len(self.columns) > 1 and self.parent.primary_key.issuperset(self.columns): util.warn( ("On mapper %s, primary key column '%s' is being combined " "with distinct primary key column '%s' in attribute '%s'. " "Use explicit properties to give each column its own mapped " "attribute name.") % (str(self.parent), str(self.columns[1]), str(self.columns[0]), self.key)) def copy(self): return ColumnProperty(deferred=self.deferred, group=self.group, *self.columns) def getattr(self, state, column): return getattr(state.class_, self.key).impl.get(state) def getcommitted(self, state, column): return getattr(state.class_, self.key).impl.get_committed_value(state) def setattr(self, state, value, column): getattr(state.class_, self.key).impl.set(state, value, None) def merge(self, session, source, dest, dont_load, _recursive): value = attributes.get_as_list(source._state, self.key, passive=True) if value: setattr(dest, self.key, value[0]) else: # TODO: lazy callable should merge to the new instance dest._state.expire_attributes([self.key]) def get_col_value(self, column, value): return value class ColumnComparator(PropComparator): def clause_element(self): return self.prop.columns[0] def operate(self, op, *other): return op(self.prop.columns[0], *other) def reverse_operate(self, op, other): col = self.prop.columns[0] return op(col._bind_param(other), col)ColumnProperty.logger = logging.class_logger(ColumnProperty)class CompositeProperty(ColumnProperty): """subclasses ColumnProperty to provide composite type support.""" def __init__(self, class_, *columns, **kwargs): super(CompositeProperty, self).__init__(*columns, **kwargs) self.composite_class = class_ self.comparator = kwargs.pop('comparator', CompositeProperty.Comparator)(self) def do_init(self): super(ColumnProperty, self).do_init() # TODO: similar PK check as ColumnProperty does ? def copy(self): return CompositeProperty(deferred=self.deferred, group=self.group, composite_class=self.composite_class, *self.columns) def getattr(self, state, column): obj = getattr(state.class_, self.key).impl.get(state) return self.get_col_value(column, obj) def getcommitted(self, state, column): obj = getattr(state.class_, self.key).impl.get_committed_value(state) return self.get_col_value(column, obj) def setattr(self, state, value, column): # TODO: test coverage for this method obj = getattr(state.class_, self.key).impl.get(state) if obj is None: obj = self.composite_class(*[None for c in self.columns]) getattr(state.class_, self.key).impl.set(state, obj, None) for a, b in zip(self.columns, value.__composite_values__()): if a is column: setattr(obj, b, value) def get_col_value(self, column, value): for a, b in zip(self.columns, value.__composite_values__()): if a is column: return b class Comparator(PropComparator): def __eq__(self, other): if other is None: return sql.and_(*[a==None for a in self.prop.columns]) else: return sql.and_(*[a==b for a, b in zip(self.prop.columns, other.__composite_values__())]) def __ne__(self, other): return sql.or_(*[a!=b for a, b in zip(self.prop.columns, other.__composite_values__())])class SynonymProperty(MapperProperty): def __init__(self, name, map_column=None): self.name = name self.map_column=map_column self.instrument = None def setup(self, querycontext, **kwargs): pass def create_row_processor(self, selectcontext, mapper, row): return (None, None, None) def do_init(self): class_ = self.parent.class_ def comparator(): return self.parent._get_property(self.key, resolve_synonyms=True).comparator self.logger.info("register managed attribute %s on class %s" % (self.key, class_.__name__)) if self.instrument is None: class SynonymProp(object): def __set__(s, obj, value): setattr(obj, self.name, value) def __delete__(s, obj): delattr(obj, self.name) def __get__(s, obj, owner): if obj is None: return s return getattr(obj, self.name) self.instrument = SynonymProp() sessionlib.register_attribute(class_, self.key, uselist=False, proxy_property=self.instrument, useobject=False, comparator=comparator) def merge(self, session, source, dest, _recursive): passSynonymProperty.logger = logging.class_logger(SynonymProperty)class PropertyLoader(StrategizedProperty): """Describes an object property that holds a single item or list of items that correspond to a related database table. """ def __init__(self, argument, secondary=None, primaryjoin=None, secondaryjoin=None, entity_name=None, foreign_keys=None, foreignkey=None, uselist=None, private=False, association=None, order_by=False, attributeext=None, backref=None, is_backref=False, post_update=False, cascade=None, viewonly=False, lazy=True, collection_class=None, passive_deletes=False, passive_updates=True, remote_side=None, enable_typechecks=True, join_depth=None, strategy_class=None): self.uselist = uselist self.argument = argument self.entity_name = entity_name self.secondary = secondary self.primaryjoin = primaryjoin self.secondaryjoin = secondaryjoin self.post_update = post_update self.direction = None self.viewonly = viewonly self.lazy = lazy self.foreign_keys = util.to_set(foreign_keys) self._legacy_foreignkey = util.to_set(foreignkey) if foreignkey: util.warn_deprecated('foreignkey option is deprecated; see docs for details') self.collection_class = collection_class self.passive_deletes = passive_deletes self.passive_updates = passive_updates self.remote_side = util.to_set(remote_side) self.enable_typechecks = enable_typechecks self.__parent_join_cache = weakref.WeakKeyDictionary() self.comparator = PropertyLoader.Comparator(self) self.join_depth = join_depth self.strategy_class = strategy_class self._reverse_property = None if cascade is not None: self.cascade = CascadeOptions(cascade) else: if private: util.warn_deprecated('private option is deprecated; see docs for details') self.cascade = CascadeOptions("all, delete-orphan") else: self.cascade = CascadeOptions("save-update, merge") if self.passive_deletes == 'all' and ("delete" in self.cascade or "delete-orphan" in self.cascade): raise exceptions.ArgumentError("Can't set passive_deletes='all' in conjunction with 'delete' or 'delete-orphan' cascade") self.association = association if association: util.warn_deprecated('association option is deprecated; see docs for details') self.order_by = order_by self.attributeext=attributeext if isinstance(backref, str): # propigate explicitly sent primary/secondary join conditions to the BackRef object if # just a string was sent if secondary is not None: # reverse primary/secondary in case of a many-to-many self.backref = BackRef(backref, primaryjoin=secondaryjoin, secondaryjoin=primaryjoin, passive_updates=self.passive_updates) else: self.backref = BackRef(backref, primaryjoin=primaryjoin, secondaryjoin=secondaryjoin, passive_updates=self.passive_updates) else: self.backref = backref self.is_backref = is_backref class Comparator(PropComparator): def __eq__(self, other): if other is None: if self.prop.uselist: return ~sql.exists([1], self.prop.primaryjoin) else: return self.prop._optimized_compare(None) elif self.prop.uselist: if not hasattr(other, '__iter__'): raise exceptions.InvalidRequestError("Can only compare a collection to an iterable object. Use contains().") else: j = self.prop.primaryjoin if self.prop.secondaryjoin: j = j & self.prop.secondaryjoin clauses = [] for o in other: clauses.append( sql.exists([1], j & sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(o))])) ) return sql.and_(*clauses) else: return self.prop._optimized_compare(other)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -