📄 strategies.py
字号:
# strategies.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""sqlalchemy.orm.interfaces.LoaderStrategy implementations, and related MapperOptions."""from sqlalchemy import sql, util, exceptions, loggingfrom sqlalchemy.sql import util as sql_utilfrom sqlalchemy.sql import visitors, expression, operatorsfrom sqlalchemy.orm import mapper, attributesfrom sqlalchemy.orm.interfaces import LoaderStrategy, StrategizedOption, MapperOption, PropertyOption, serialize_path, deserialize_pathfrom sqlalchemy.orm import session as sessionlibfrom sqlalchemy.orm import util as mapperutilclass ColumnLoader(LoaderStrategy): """Default column loader.""" def init(self): super(ColumnLoader, self).init() self.columns = self.parent_property.columns self._should_log_debug = logging.is_debug_enabled(self.logger) self.is_composite = hasattr(self.parent_property, 'composite_class') def setup_query(self, context, parentclauses=None, **kwargs): for c in self.columns: if parentclauses is not None: context.secondary_columns.append(parentclauses.aliased_column(c)) else: context.primary_columns.append(c) def init_class_attribute(self): self.is_class_level = True if self.is_composite: self._init_composite_attribute() else: self._init_scalar_attribute() def _init_composite_attribute(self): self.logger.info("register managed composite attribute %s on class %s" % (self.key, self.parent.class_.__name__)) def copy(obj): return self.parent_property.composite_class( *obj.__composite_values__()) def compare(a, b): for col, aprop, bprop in zip(self.columns, a.__composite_values__(), b.__composite_values__()): if not col.type.compare_values(aprop, bprop): return False else: return True sessionlib.register_attribute(self.parent.class_, self.key, uselist=False, useobject=False, copy_function=copy, compare_function=compare, mutable_scalars=True, comparator=self.parent_property.comparator) def _init_scalar_attribute(self): self.logger.info("register managed attribute %s on class %s" % (self.key, self.parent.class_.__name__)) coltype = self.columns[0].type sessionlib.register_attribute(self.parent.class_, self.key, uselist=False, useobject=False, copy_function=coltype.copy_value, compare_function=coltype.compare_values, mutable_scalars=self.columns[0].type.is_mutable(), comparator=self.parent_property.comparator) def create_row_processor(self, selectcontext, mapper, row): if self.is_composite: for c in self.columns: if c not in row: break else: def new_execute(instance, row, **flags): if self._should_log_debug: self.logger.debug("populating %s with %s/%s..." % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key)) instance.__dict__[self.key] = self.parent_property.composite_class(*[row[c] for c in self.columns]) if self._should_log_debug: self.logger.debug("Returning active composite column fetcher for %s %s" % (mapper, self.key)) return (new_execute, None, None) elif self.columns[0] in row: def new_execute(instance, row, **flags): if self._should_log_debug: self.logger.debug("populating %s with %s/%s" % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key)) instance.__dict__[self.key] = row[self.columns[0]] if self._should_log_debug: self.logger.debug("Returning active column fetcher for %s %s" % (mapper, self.key)) return (new_execute, None, None) else: def new_execute(instance, row, isnew, **flags): if isnew: instance._state.expire_attributes([self.key]) if self._should_log_debug: self.logger.debug("Deferring load for %s %s" % (mapper, self.key)) return (new_execute, None, None)ColumnLoader.logger = logging.class_logger(ColumnLoader)class DeferredColumnLoader(LoaderStrategy): """Deferred column loader, a per-column or per-column-group lazy loader.""" def create_row_processor(self, selectcontext, mapper, row): if self.columns[0] in row: return self.parent_property._get_strategy(ColumnLoader).create_row_processor(selectcontext, mapper, row) elif not self.is_class_level or len(selectcontext.options): def new_execute(instance, row, **flags): if self._should_log_debug: self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key)) instance._state.set_callable(self.key, self.setup_loader(instance)) return (new_execute, None, None) else: def new_execute(instance, row, **flags): if self._should_log_debug: self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key)) instance._state.reset(self.key) return (new_execute, None, None) def init(self): super(DeferredColumnLoader, self).init() if hasattr(self.parent_property, 'composite_class'): raise NotImplementedError("Deferred loading for composite types not implemented yet") self.columns = self.parent_property.columns self.group = self.parent_property.group self._should_log_debug = logging.is_debug_enabled(self.logger) def init_class_attribute(self): self.is_class_level = True self.logger.info("register managed attribute %s on class %s" % (self.key, self.parent.class_.__name__)) sessionlib.register_attribute(self.parent.class_, self.key, uselist=False, useobject=False, callable_=self.class_level_loader, copy_function=self.columns[0].type.copy_value, compare_function=self.columns[0].type.compare_values, mutable_scalars=self.columns[0].type.is_mutable(), comparator=self.parent_property.comparator) def setup_query(self, context, only_load_props=None, **kwargs): if \ (self.group is not None and context.attributes.get(('undefer', self.group), False)) or \ (only_load_props and self.key in only_load_props): self.parent_property._get_strategy(ColumnLoader).setup_query(context, **kwargs) def class_level_loader(self, instance, props=None): if not mapper.has_mapper(instance): return None localparent = mapper.object_mapper(instance) # adjust for the ColumnProperty associated with the instance # not being our own ColumnProperty. This can occur when entity_name # mappers are used to map different versions of the same ColumnProperty # to the class. prop = localparent.get_property(self.key) if prop is not self.parent_property: return prop._get_strategy(DeferredColumnLoader).setup_loader(instance) return LoadDeferredColumns(instance, self.key, props) def setup_loader(self, instance, props=None, create_statement=None): return LoadDeferredColumns(instance, self.key, props, optimizing_statement=create_statement) DeferredColumnLoader.logger = logging.class_logger(DeferredColumnLoader)class LoadDeferredColumns(object): """callable, serializable loader object used by DeferredColumnLoader""" def __init__(self, instance, key, keys, optimizing_statement=None): self.instance = instance self.key = key self.keys = keys self.optimizing_statement = optimizing_statement def __getstate__(self): return {'instance':self.instance, 'key':self.key, 'keys':self.keys} def __setstate__(self, state): self.instance = state['instance'] self.key = state['key'] self.keys = state['keys'] self.optimizing_statement = None def __call__(self): if not mapper.has_identity(self.instance): return None localparent = mapper.object_mapper(self.instance, raiseerror=False) prop = localparent.get_property(self.key) strategy = prop._get_strategy(DeferredColumnLoader) if self.keys: toload = self.keys elif strategy.group: toload = [p.key for p in localparent.iterate_properties if isinstance(p.strategy, DeferredColumnLoader) and p.group==strategy.group] else: toload = [self.key] # narrow the keys down to just those which have no history group = [k for k in toload if k in self.instance._state.unmodified] if strategy._should_log_debug: strategy.logger.debug("deferred load %s group %s" % (mapperutil.attribute_str(self.instance, self.key), group and ','.join(group) or 'None')) session = sessionlib.object_session(self.instance) if session is None: raise exceptions.UnboundExecutionError("Parent instance %s is not bound to a Session; deferred load operation of attribute '%s' cannot proceed" % (self.instance.__class__, self.key)) query = session.query(localparent) if not self.optimizing_statement: ident = self.instance._instance_key[1] query._get(None, ident=ident, only_load_props=group, refresh_instance=self.instance._state) else: statement, params = self.optimizing_statement(self.instance) query.from_statement(statement).params(params)._get(None, only_load_props=group, refresh_instance=self.instance._state) return attributes.ATTR_WAS_SETclass DeferredOption(StrategizedOption): def __init__(self, key, defer=False): super(DeferredOption, self).__init__(key) self.defer = defer def get_strategy_class(self): if self.defer: return DeferredColumnLoader else: return ColumnLoaderclass UndeferGroupOption(MapperOption): def __init__(self, group): self.group = group def process_query(self, query): query._attributes[('undefer', self.group)] = Trueclass AbstractRelationLoader(LoaderStrategy): def init(self): super(AbstractRelationLoader, self).init() for attr in ['primaryjoin', 'secondaryjoin', 'secondary', 'foreign_keys', 'mapper', 'select_mapper', 'target', 'select_table', 'loads_polymorphic', 'uselist', 'cascade', 'attributeext', 'order_by', 'remote_side', 'polymorphic_primaryjoin', 'polymorphic_secondaryjoin', 'direction']: setattr(self, attr, getattr(self.parent_property, attr)) self._should_log_debug = logging.is_debug_enabled(self.logger) def _init_instance_attribute(self, instance, callable_=None): if callable_: instance._state.set_callable(self.key, callable_) else: instance._state.initialize(self.key) def _register_attribute(self, class_, callable_=None, **kwargs): self.logger.info("register managed %s attribute %s on class %s" % ((self.uselist and "list-holding" or "scalar"), self.key, self.parent.class_.__name__)) sessionlib.register_attribute(class_, self.key, uselist=self.uselist, useobject=True, extension=self.attributeext, cascade=self.cascade, trackparent=True, typecallable=self.parent_property.collection_class, callable_=callable_, comparator=self.parent_property.comparator, **kwargs)class DynaLoader(AbstractRelationLoader): def init_class_attribute(self): self.is_class_level = True
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -