⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapper.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
                    # TODO: this fires off more than needed, try to organize syncrules                    # per table                    for m in util.reversed(list(mapper.iterate_to_root())):                        if m._synchronizer is not None:                            m._synchronizer.execute(state, state)                    # testlib.pragma exempt:__hash__                    inserted_objects.add((state, connection))        if not postupdate:            # call after_XXX extensions            for state, connection, has_identity in tups:                mapper = _state_mapper(state)                if not has_identity:                    if 'after_insert' in mapper.extension.methods:                        mapper.extension.after_insert(mapper, connection, state.obj())                else:                    if 'after_update' in mapper.extension.methods:                        mapper.extension.after_update(mapper, connection, state.obj())    def _postfetch(self, uowtransaction, connection, table, state, resultproxy, params, value_params):        """After an ``INSERT`` or ``UPDATE``, assemble newly generated        values on an instance.  For columns which are marked as being generated        on the database side, set up a group-based "deferred" loader        which will populate those attributes in one query when next accessed.        """        postfetch_cols = util.Set(resultproxy.postfetch_cols()).union(util.Set(value_params.keys()))        deferred_props = []        for c in self._cols_by_table[table]:            if c in postfetch_cols and (not c.key in params or c in value_params):                prop = self._columntoproperty[c]                deferred_props.append(prop.key)            elif not c.primary_key and c.key in params and self._get_state_attr_by_column(state, c) != params[c.key]:                self._set_state_attr_by_column(state, c, params[c.key])        if deferred_props:            if self.eager_defaults:                _instance_key = self._identity_key_from_state(state)                state.dict['_instance_key'] = _instance_key                uowtransaction.session.query(self)._get(_instance_key, refresh_instance=state, only_load_props=deferred_props)            else:                _expire_state(state, deferred_props)    def _delete_obj(self, states, uowtransaction):        """Issue ``DELETE`` statements for a list of objects.        This is called within the context of a UOWTransaction during a        flush operation.        """        if self.__should_log_debug:            self.__log_debug("_delete_obj() start")        if 'connection_callable' in uowtransaction.mapper_flush_opts:            connection_callable = uowtransaction.mapper_flush_opts['connection_callable']            tups = [(state, connection_callable(self, state.obj())) for state in states]        else:            connection = uowtransaction.transaction.connection(self)            tups = [(state, connection) for state in states]        for (state, connection) in tups:            mapper = _state_mapper(state)            if 'before_delete' in mapper.extension.methods:                mapper.extension.before_delete(mapper, connection, state.obj())        deleted_objects = util.Set()        table_to_mapper = {}        for mapper in self.base_mapper.polymorphic_iterator():            for t in mapper.tables:                table_to_mapper[t] = mapper        for table in sqlutil.sort_tables(table_to_mapper.keys(), reverse=True):            delete = {}            for (state, connection) in tups:                mapper = _state_mapper(state)                if table not in mapper._pks_by_table:                    continue                params = {}                if not _state_has_identity(state):                    continue                else:                    delete.setdefault(connection, []).append(params)                for col in mapper._pks_by_table[table]:                    params[col.key] = mapper._get_state_attr_by_column(state, col)                if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):                    params[mapper.version_id_col.key] = mapper._get_state_attr_by_column(state, mapper.version_id_col)                # testlib.pragma exempt:__hash__                deleted_objects.add((state, connection))            for connection, del_objects in delete.iteritems():                mapper = table_to_mapper[table]                def comparator(a, b):                    for col in mapper._pks_by_table[table]:                        x = cmp(a[col.key],b[col.key])                        if x != 0:                            return x                    return 0                del_objects.sort(comparator)                clause = sql.and_()                for col in mapper._pks_by_table[table]:                    clause.clauses.append(col == sql.bindparam(col.key, type_=col.type))                if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):                    clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type))                statement = table.delete(clause)                c = connection.execute(statement, del_objects)                if c.supports_sane_multi_rowcount() and c.rowcount != len(del_objects):                    raise exceptions.ConcurrentModificationError("Deleted rowcount %d does not match number of objects deleted %d" % (c.rowcount, len(del_objects)))        for state, connection in deleted_objects:            mapper = _state_mapper(state)            if 'after_delete' in mapper.extension.methods:                mapper.extension.after_delete(mapper, connection, state.obj())    def _register_dependencies(self, uowcommit):        """Register ``DependencyProcessor`` instances with a        ``unitofwork.UOWTransaction``.        This call `register_dependencies` on all attached        ``MapperProperty`` instances.        """        for prop in self.__props.values():            prop.register_dependencies(uowcommit)        for dep in self._dependency_processors:            dep.register_dependencies(uowcommit)    def cascade_iterator(self, type, state, recursive=None, halt_on=None):        """Iterate each element and its mapper in an object graph,        for all relations that meet the given cascade rule.        type          The name of the cascade rule (i.e. save-update, delete,          etc.)        state          The lead InstanceState.  child items will be processed per          the relations defined for this object's mapper.        recursive          Used by the function for internal context during recursive          calls, leave as None.        the return value are object instances; this provides a strong        reference so that they don't fall out of scope immediately.        """        if recursive is None:            recursive=util.IdentitySet()        for prop in self.__props.values():            for (c, m) in prop.cascade_iterator(type, state, recursive, halt_on=halt_on):                yield (c, m)    def get_select_mapper(self):        """Return the mapper used for issuing selects.        This mapper is the same mapper as `self` unless the        select_table argument was specified for this mapper.        """        return self.__surrogate_mapper or self    def _instance(self, context, row, result=None, skip_polymorphic=False, extension=None, only_load_props=None, refresh_instance=None):        if not extension:            extension = self.extension        if 'translate_row' in extension.methods:            ret = extension.translate_row(self, context, row)            if ret is not EXT_CONTINUE:                row = ret        if not refresh_instance and not skip_polymorphic and self.polymorphic_on:            discriminator = row[self.polymorphic_on]            if discriminator:                mapper = self.polymorphic_map[discriminator]                if mapper is not self:                    if ('polymorphic_fetch', mapper) not in context.attributes:                        context.attributes[('polymorphic_fetch', mapper)] = (self, [t for t in mapper.tables if t not in self.tables])                    row = self.translate_row(mapper, row)                    return mapper._instance(context, row, result=result, skip_polymorphic=True)        # determine identity key        if refresh_instance:            try:                identitykey = refresh_instance.dict['_instance_key']            except KeyError:                # super-rare condition; a refresh is being called                # on a non-instance-key instance; this is meant to only                # occur wihtin a flush()                identitykey = self._identity_key_from_state(refresh_instance)        else:            identitykey = self.identity_key_from_row(row)        session_identity_map = context.session.identity_map        if identitykey in session_identity_map:            instance = session_identity_map[identitykey]            state = instance._state            if self.__should_log_debug:                self.__log_debug("_instance(): using existing instance %s identity %s" % (instance_str(instance), str(identitykey)))            isnew = state.runid != context.runid            currentload = not isnew            if not currentload and context.version_check and self.version_id_col and self._get_attr_by_column(instance, self.version_id_col) != row[self.version_id_col]:                raise exceptions.ConcurrentModificationError("Instance '%s' version of %s does not match %s" % (instance, self._get_attr_by_column(instance, self.version_id_col), row[self.version_id_col]))        elif refresh_instance:            # out of band refresh_instance detected (i.e. its not in the session.identity_map)            # honor it anyway.  this can happen if a _get() occurs within save_obj(), such as            # when eager_defaults is True.            state = refresh_instance            instance = state.obj()            isnew = state.runid != context.runid            currentload = True        else:            if self.__should_log_debug:                self.__log_debug("_instance(): identity key %s not in session" % str(identitykey))            if self.allow_null_pks:                for x in identitykey[1]:                    if x is not None:                        break                else:                    return None            else:                if None in identitykey[1]:                    return None            isnew = True            currentload = True            if 'create_instance' in extension.methods:                instance = extension.create_instance(self, context, row, self.class_)                if instance is EXT_CONTINUE:                    instance = attributes.new_instance(self.class_)                else:                    attributes.manage(instance)            else:                instance = attributes.new_instance(self.class_)            if self.__should_log_debug:                self.__log_debug("_instance(): created new instance %s identity %s" % (instance_str(instance), str(identitykey)))            state = instance._state            instance._entity_name = self.entity_name            instance._instance_key = identitykey            instance._sa_session_id = context.session.hash_key            session_identity_map[identitykey] = instance        if currentload or context.populate_existing or self.always_refresh:            if isnew:                state.runid = context.runid                context.progress.add(state)            if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=only_load_props, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:                self.populate_instance(context, instance, row, only_load_props=only_load_props, instancekey=identitykey, isnew=isnew)                else:            attrs = getattr(state, 'expired_attributes', None)            # populate attributes on non-loading instances which have been expired            # TODO: also support deferred attributes here [ticket:870]            if attrs:                 if state in context.partials:                    isnew = False                    attrs = context.partials[state]                else:                    isnew = True                    attrs = state.expired_attributes.intersection(state.unmodified)                    context.partials[state] = attrs  #<-- allow query.instances to commit the subset of attrs                if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=attrs, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:                    self.populate_instance(context, instance, row, only_load_props=attrs, instancekey=identitykey, isnew=isnew)        if result is not None and ('append_result' not in extension.methods or extension.append_result(self, context, row, instance, result, instancek

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -