📄 mapper.py
字号:
# TODO: this fires off more than needed, try to organize syncrules # per table for m in util.reversed(list(mapper.iterate_to_root())): if m._synchronizer is not None: m._synchronizer.execute(state, state) # testlib.pragma exempt:__hash__ inserted_objects.add((state, connection)) if not postupdate: # call after_XXX extensions for state, connection, has_identity in tups: mapper = _state_mapper(state) if not has_identity: if 'after_insert' in mapper.extension.methods: mapper.extension.after_insert(mapper, connection, state.obj()) else: if 'after_update' in mapper.extension.methods: mapper.extension.after_update(mapper, connection, state.obj()) def _postfetch(self, uowtransaction, connection, table, state, resultproxy, params, value_params): """After an ``INSERT`` or ``UPDATE``, assemble newly generated values on an instance. For columns which are marked as being generated on the database side, set up a group-based "deferred" loader which will populate those attributes in one query when next accessed. """ postfetch_cols = util.Set(resultproxy.postfetch_cols()).union(util.Set(value_params.keys())) deferred_props = [] for c in self._cols_by_table[table]: if c in postfetch_cols and (not c.key in params or c in value_params): prop = self._columntoproperty[c] deferred_props.append(prop.key) elif not c.primary_key and c.key in params and self._get_state_attr_by_column(state, c) != params[c.key]: self._set_state_attr_by_column(state, c, params[c.key]) if deferred_props: if self.eager_defaults: _instance_key = self._identity_key_from_state(state) state.dict['_instance_key'] = _instance_key uowtransaction.session.query(self)._get(_instance_key, refresh_instance=state, only_load_props=deferred_props) else: _expire_state(state, deferred_props) def _delete_obj(self, states, uowtransaction): """Issue ``DELETE`` statements for a list of objects. This is called within the context of a UOWTransaction during a flush operation. """ if self.__should_log_debug: self.__log_debug("_delete_obj() start") if 'connection_callable' in uowtransaction.mapper_flush_opts: connection_callable = uowtransaction.mapper_flush_opts['connection_callable'] tups = [(state, connection_callable(self, state.obj())) for state in states] else: connection = uowtransaction.transaction.connection(self) tups = [(state, connection) for state in states] for (state, connection) in tups: mapper = _state_mapper(state) if 'before_delete' in mapper.extension.methods: mapper.extension.before_delete(mapper, connection, state.obj()) deleted_objects = util.Set() table_to_mapper = {} for mapper in self.base_mapper.polymorphic_iterator(): for t in mapper.tables: table_to_mapper[t] = mapper for table in sqlutil.sort_tables(table_to_mapper.keys(), reverse=True): delete = {} for (state, connection) in tups: mapper = _state_mapper(state) if table not in mapper._pks_by_table: continue params = {} if not _state_has_identity(state): continue else: delete.setdefault(connection, []).append(params) for col in mapper._pks_by_table[table]: params[col.key] = mapper._get_state_attr_by_column(state, col) if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col): params[mapper.version_id_col.key] = mapper._get_state_attr_by_column(state, mapper.version_id_col) # testlib.pragma exempt:__hash__ deleted_objects.add((state, connection)) for connection, del_objects in delete.iteritems(): mapper = table_to_mapper[table] def comparator(a, b): for col in mapper._pks_by_table[table]: x = cmp(a[col.key],b[col.key]) if x != 0: return x return 0 del_objects.sort(comparator) clause = sql.and_() for col in mapper._pks_by_table[table]: clause.clauses.append(col == sql.bindparam(col.key, type_=col.type)) if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col): clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type)) statement = table.delete(clause) c = connection.execute(statement, del_objects) if c.supports_sane_multi_rowcount() and c.rowcount != len(del_objects): raise exceptions.ConcurrentModificationError("Deleted rowcount %d does not match number of objects deleted %d" % (c.rowcount, len(del_objects))) for state, connection in deleted_objects: mapper = _state_mapper(state) if 'after_delete' in mapper.extension.methods: mapper.extension.after_delete(mapper, connection, state.obj()) def _register_dependencies(self, uowcommit): """Register ``DependencyProcessor`` instances with a ``unitofwork.UOWTransaction``. This call `register_dependencies` on all attached ``MapperProperty`` instances. """ for prop in self.__props.values(): prop.register_dependencies(uowcommit) for dep in self._dependency_processors: dep.register_dependencies(uowcommit) def cascade_iterator(self, type, state, recursive=None, halt_on=None): """Iterate each element and its mapper in an object graph, for all relations that meet the given cascade rule. type The name of the cascade rule (i.e. save-update, delete, etc.) state The lead InstanceState. child items will be processed per the relations defined for this object's mapper. recursive Used by the function for internal context during recursive calls, leave as None. the return value are object instances; this provides a strong reference so that they don't fall out of scope immediately. """ if recursive is None: recursive=util.IdentitySet() for prop in self.__props.values(): for (c, m) in prop.cascade_iterator(type, state, recursive, halt_on=halt_on): yield (c, m) def get_select_mapper(self): """Return the mapper used for issuing selects. This mapper is the same mapper as `self` unless the select_table argument was specified for this mapper. """ return self.__surrogate_mapper or self def _instance(self, context, row, result=None, skip_polymorphic=False, extension=None, only_load_props=None, refresh_instance=None): if not extension: extension = self.extension if 'translate_row' in extension.methods: ret = extension.translate_row(self, context, row) if ret is not EXT_CONTINUE: row = ret if not refresh_instance and not skip_polymorphic and self.polymorphic_on: discriminator = row[self.polymorphic_on] if discriminator: mapper = self.polymorphic_map[discriminator] if mapper is not self: if ('polymorphic_fetch', mapper) not in context.attributes: context.attributes[('polymorphic_fetch', mapper)] = (self, [t for t in mapper.tables if t not in self.tables]) row = self.translate_row(mapper, row) return mapper._instance(context, row, result=result, skip_polymorphic=True) # determine identity key if refresh_instance: try: identitykey = refresh_instance.dict['_instance_key'] except KeyError: # super-rare condition; a refresh is being called # on a non-instance-key instance; this is meant to only # occur wihtin a flush() identitykey = self._identity_key_from_state(refresh_instance) else: identitykey = self.identity_key_from_row(row) session_identity_map = context.session.identity_map if identitykey in session_identity_map: instance = session_identity_map[identitykey] state = instance._state if self.__should_log_debug: self.__log_debug("_instance(): using existing instance %s identity %s" % (instance_str(instance), str(identitykey))) isnew = state.runid != context.runid currentload = not isnew if not currentload and context.version_check and self.version_id_col and self._get_attr_by_column(instance, self.version_id_col) != row[self.version_id_col]: raise exceptions.ConcurrentModificationError("Instance '%s' version of %s does not match %s" % (instance, self._get_attr_by_column(instance, self.version_id_col), row[self.version_id_col])) elif refresh_instance: # out of band refresh_instance detected (i.e. its not in the session.identity_map) # honor it anyway. this can happen if a _get() occurs within save_obj(), such as # when eager_defaults is True. state = refresh_instance instance = state.obj() isnew = state.runid != context.runid currentload = True else: if self.__should_log_debug: self.__log_debug("_instance(): identity key %s not in session" % str(identitykey)) if self.allow_null_pks: for x in identitykey[1]: if x is not None: break else: return None else: if None in identitykey[1]: return None isnew = True currentload = True if 'create_instance' in extension.methods: instance = extension.create_instance(self, context, row, self.class_) if instance is EXT_CONTINUE: instance = attributes.new_instance(self.class_) else: attributes.manage(instance) else: instance = attributes.new_instance(self.class_) if self.__should_log_debug: self.__log_debug("_instance(): created new instance %s identity %s" % (instance_str(instance), str(identitykey))) state = instance._state instance._entity_name = self.entity_name instance._instance_key = identitykey instance._sa_session_id = context.session.hash_key session_identity_map[identitykey] = instance if currentload or context.populate_existing or self.always_refresh: if isnew: state.runid = context.runid context.progress.add(state) if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=only_load_props, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE: self.populate_instance(context, instance, row, only_load_props=only_load_props, instancekey=identitykey, isnew=isnew) else: attrs = getattr(state, 'expired_attributes', None) # populate attributes on non-loading instances which have been expired # TODO: also support deferred attributes here [ticket:870] if attrs: if state in context.partials: isnew = False attrs = context.partials[state] else: isnew = True attrs = state.expired_attributes.intersection(state.unmodified) context.partials[state] = attrs #<-- allow query.instances to commit the subset of attrs if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=attrs, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE: self.populate_instance(context, instance, row, only_load_props=attrs, instancekey=identitykey, isnew=isnew) if result is not None and ('append_result' not in extension.methods or extension.append_result(self, context, row, instance, result, instancek
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -