⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 strategies.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
# strategies.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""sqlalchemy.orm.interfaces.LoaderStrategy implementations, and related MapperOptions."""from sqlalchemy import sql, util, exceptions, loggingfrom sqlalchemy.sql import util as sql_utilfrom sqlalchemy.sql import visitors, expression, operatorsfrom sqlalchemy.orm import mapper, attributesfrom sqlalchemy.orm.interfaces import LoaderStrategy, StrategizedOption, MapperOption, PropertyOption, serialize_path, deserialize_pathfrom sqlalchemy.orm import session as sessionlibfrom sqlalchemy.orm import util as mapperutilclass ColumnLoader(LoaderStrategy):    """Default column loader."""        def init(self):        super(ColumnLoader, self).init()        self.columns = self.parent_property.columns        self._should_log_debug = logging.is_debug_enabled(self.logger)        self.is_composite = hasattr(self.parent_property, 'composite_class')            def setup_query(self, context, parentclauses=None, **kwargs):        for c in self.columns:            if parentclauses is not None:                context.secondary_columns.append(parentclauses.aliased_column(c))            else:                context.primary_columns.append(c)            def init_class_attribute(self):        self.is_class_level = True        if self.is_composite:            self._init_composite_attribute()        else:            self._init_scalar_attribute()    def _init_composite_attribute(self):        self.logger.info("register managed composite attribute %s on class %s" % (self.key, self.parent.class_.__name__))        def copy(obj):            return self.parent_property.composite_class(                *obj.__composite_values__())        def compare(a, b):            for col, aprop, bprop in zip(self.columns,                                         a.__composite_values__(),                                         b.__composite_values__()):                if not col.type.compare_values(aprop, bprop):                    return False            else:                return True        sessionlib.register_attribute(self.parent.class_, self.key, uselist=False, useobject=False, copy_function=copy, compare_function=compare, mutable_scalars=True, comparator=self.parent_property.comparator)    def _init_scalar_attribute(self):        self.logger.info("register managed attribute %s on class %s" % (self.key, self.parent.class_.__name__))        coltype = self.columns[0].type        sessionlib.register_attribute(self.parent.class_, self.key, uselist=False, useobject=False, copy_function=coltype.copy_value, compare_function=coltype.compare_values, mutable_scalars=self.columns[0].type.is_mutable(), comparator=self.parent_property.comparator)            def create_row_processor(self, selectcontext, mapper, row):        if self.is_composite:            for c in self.columns:                if c not in row:                    break            else:                def new_execute(instance, row, **flags):                    if self._should_log_debug:                        self.logger.debug("populating %s with %s/%s..." % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key))                    instance.__dict__[self.key] = self.parent_property.composite_class(*[row[c] for c in self.columns])                if self._should_log_debug:                    self.logger.debug("Returning active composite column fetcher for %s %s" % (mapper, self.key))                return (new_execute, None, None)                        elif self.columns[0] in row:            def new_execute(instance, row, **flags):                if self._should_log_debug:                    self.logger.debug("populating %s with %s/%s" % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key))                instance.__dict__[self.key] = row[self.columns[0]]            if self._should_log_debug:                self.logger.debug("Returning active column fetcher for %s %s" % (mapper, self.key))            return (new_execute, None, None)        else:            def new_execute(instance, row, isnew, **flags):                if isnew:                    instance._state.expire_attributes([self.key])            if self._should_log_debug:                self.logger.debug("Deferring load for %s %s" % (mapper, self.key))            return (new_execute, None, None)ColumnLoader.logger = logging.class_logger(ColumnLoader)class DeferredColumnLoader(LoaderStrategy):    """Deferred column loader, a per-column or per-column-group lazy loader."""        def create_row_processor(self, selectcontext, mapper, row):        if self.columns[0] in row:            return self.parent_property._get_strategy(ColumnLoader).create_row_processor(selectcontext, mapper, row)        elif not self.is_class_level or len(selectcontext.options):            def new_execute(instance, row, **flags):                if self._should_log_debug:                    self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key))                instance._state.set_callable(self.key, self.setup_loader(instance))            return (new_execute, None, None)        else:            def new_execute(instance, row, **flags):                if self._should_log_debug:                    self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key))                instance._state.reset(self.key)            return (new_execute, None, None)    def init(self):        super(DeferredColumnLoader, self).init()        if hasattr(self.parent_property, 'composite_class'):            raise NotImplementedError("Deferred loading for composite types not implemented yet")        self.columns = self.parent_property.columns        self.group = self.parent_property.group        self._should_log_debug = logging.is_debug_enabled(self.logger)    def init_class_attribute(self):        self.is_class_level = True        self.logger.info("register managed attribute %s on class %s" % (self.key, self.parent.class_.__name__))        sessionlib.register_attribute(self.parent.class_, self.key, uselist=False, useobject=False, callable_=self.class_level_loader, copy_function=self.columns[0].type.copy_value, compare_function=self.columns[0].type.compare_values, mutable_scalars=self.columns[0].type.is_mutable(), comparator=self.parent_property.comparator)    def setup_query(self, context, only_load_props=None, **kwargs):        if \            (self.group is not None and context.attributes.get(('undefer', self.group), False)) or \            (only_load_props and self.key in only_load_props):                        self.parent_property._get_strategy(ColumnLoader).setup_query(context, **kwargs)        def class_level_loader(self, instance, props=None):        if not mapper.has_mapper(instance):            return None                    localparent = mapper.object_mapper(instance)        # adjust for the ColumnProperty associated with the instance        # not being our own ColumnProperty.  This can occur when entity_name        # mappers are used to map different versions of the same ColumnProperty        # to the class.        prop = localparent.get_property(self.key)        if prop is not self.parent_property:            return prop._get_strategy(DeferredColumnLoader).setup_loader(instance)        return LoadDeferredColumns(instance, self.key, props)            def setup_loader(self, instance, props=None, create_statement=None):        return LoadDeferredColumns(instance, self.key, props, optimizing_statement=create_statement)                DeferredColumnLoader.logger = logging.class_logger(DeferredColumnLoader)class LoadDeferredColumns(object):    """callable, serializable loader object used by DeferredColumnLoader"""        def __init__(self, instance, key, keys, optimizing_statement=None):        self.instance = instance        self.key = key        self.keys = keys        self.optimizing_statement = optimizing_statement    def __getstate__(self):        return {'instance':self.instance, 'key':self.key, 'keys':self.keys}        def __setstate__(self, state):        self.instance = state['instance']        self.key = state['key']        self.keys = state['keys']        self.optimizing_statement = None            def __call__(self):        if not mapper.has_identity(self.instance):            return None                    localparent = mapper.object_mapper(self.instance, raiseerror=False)                prop = localparent.get_property(self.key)        strategy = prop._get_strategy(DeferredColumnLoader)        if self.keys:            toload = self.keys        elif strategy.group:            toload = [p.key for p in localparent.iterate_properties if isinstance(p.strategy, DeferredColumnLoader) and p.group==strategy.group]        else:            toload = [self.key]        # narrow the keys down to just those which have no history        group = [k for k in toload if k in self.instance._state.unmodified]        if strategy._should_log_debug:            strategy.logger.debug("deferred load %s group %s" % (mapperutil.attribute_str(self.instance, self.key), group and ','.join(group) or 'None'))        session = sessionlib.object_session(self.instance)        if session is None:            raise exceptions.UnboundExecutionError("Parent instance %s is not bound to a Session; deferred load operation of attribute '%s' cannot proceed" % (self.instance.__class__, self.key))        query = session.query(localparent)        if not self.optimizing_statement:            ident = self.instance._instance_key[1]            query._get(None, ident=ident, only_load_props=group, refresh_instance=self.instance._state)        else:            statement, params = self.optimizing_statement(self.instance)            query.from_statement(statement).params(params)._get(None, only_load_props=group, refresh_instance=self.instance._state)        return attributes.ATTR_WAS_SETclass DeferredOption(StrategizedOption):    def __init__(self, key, defer=False):        super(DeferredOption, self).__init__(key)        self.defer = defer    def get_strategy_class(self):        if self.defer:            return DeferredColumnLoader        else:            return ColumnLoaderclass UndeferGroupOption(MapperOption):    def __init__(self, group):        self.group = group    def process_query(self, query):        query._attributes[('undefer', self.group)] = Trueclass AbstractRelationLoader(LoaderStrategy):    def init(self):        super(AbstractRelationLoader, self).init()        for attr in ['primaryjoin', 'secondaryjoin', 'secondary', 'foreign_keys', 'mapper', 'select_mapper', 'target', 'select_table', 'loads_polymorphic', 'uselist', 'cascade', 'attributeext', 'order_by', 'remote_side', 'polymorphic_primaryjoin', 'polymorphic_secondaryjoin', 'direction']:            setattr(self, attr, getattr(self.parent_property, attr))        self._should_log_debug = logging.is_debug_enabled(self.logger)            def _init_instance_attribute(self, instance, callable_=None):        if callable_:            instance._state.set_callable(self.key, callable_)        else:            instance._state.initialize(self.key)            def _register_attribute(self, class_, callable_=None, **kwargs):        self.logger.info("register managed %s attribute %s on class %s" % ((self.uselist and "list-holding" or "scalar"), self.key, self.parent.class_.__name__))        sessionlib.register_attribute(class_, self.key, uselist=self.uselist, useobject=True, extension=self.attributeext, cascade=self.cascade,  trackparent=True, typecallable=self.parent_property.collection_class, callable_=callable_, comparator=self.parent_property.comparator, **kwargs)class DynaLoader(AbstractRelationLoader):    def init_class_attribute(self):        self.is_class_level = True

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -