📄 mapper.py
字号:
raise exceptions.InvalidRequestError("No contextual Session is established.") def instances(self, cursor, session, *mappers, **kwargs): """Return a list of mapped instances corresponding to the rows in a given ResultProxy. DEPRECATED. """ import sqlalchemy.orm.query return sqlalchemy.orm.Query(self, session).instances(cursor, *mappers, **kwargs) instances = util.deprecated(instances, add_deprecation_to_docstring=False) def identity_key_from_row(self, row): """Return an identity-map key for use in storing/retrieving an item from the identity map. row A ``sqlalchemy.engine.base.RowProxy`` instance or a dictionary corresponding result-set ``ColumnElement`` instances to their values within a row. """ return (self._identity_class, tuple([row[column] for column in self.primary_key]), self.entity_name) def identity_key_from_primary_key(self, primary_key): """Return an identity-map key for use in storing/retrieving an item from an identity map. primary_key A list of values indicating the identifier. """ return (self._identity_class, tuple(util.to_list(primary_key)), self.entity_name) def identity_key_from_instance(self, instance): """Return the identity key for the given instance, based on its primary key attributes. This value is typically also found on the instance itself under the attribute name `_instance_key`. """ return self.identity_key_from_primary_key(self.primary_key_from_instance(instance)) def _identity_key_from_state(self, state): return self.identity_key_from_primary_key(self._primary_key_from_state(state)) def primary_key_from_instance(self, instance): """Return the list of primary key values for the given instance. """ return [self._get_state_attr_by_column(instance._state, column) for column in self.primary_key] def _primary_key_from_state(self, state): return [self._get_state_attr_by_column(state, column) for column in self.primary_key] def _canload(self, state): if self.polymorphic_on is not None: return issubclass(state.class_, self.class_) else: return state.class_ is self.class_ def _get_col_to_prop(self, column): try: return self._columntoproperty[column] except KeyError: prop = self.__props.get(column.key, None) if prop: raise exceptions.UnmappedColumnError("Column '%s.%s' is not available, due to conflicting property '%s':%s" % (column.table.name, column.name, column.key, repr(prop))) else: raise exceptions.UnmappedColumnError("No column %s.%s is configured on mapper %s..." % (column.table.name, column.name, str(self))) def _get_state_attr_by_column(self, state, column): return self._get_col_to_prop(column).getattr(state, column) def _set_state_attr_by_column(self, state, column, value): return self._get_col_to_prop(column).setattr(state, value, column) def _get_attr_by_column(self, obj, column): return self._get_col_to_prop(column).getattr(obj._state, column) def _get_committed_attr_by_column(self, obj, column): return self._get_col_to_prop(column).getcommitted(obj._state, column) def _set_attr_by_column(self, obj, column, value): self._get_col_to_prop(column).setattr(obj._state, column, value) def _save_obj(self, states, uowtransaction, postupdate=False, post_update_cols=None, single=False): """Issue ``INSERT`` and/or ``UPDATE`` statements for a list of objects. This is called within the context of a UOWTransaction during a flush operation. `_save_obj` issues SQL statements not just for instances mapped directly by this mapper, but for instances mapped by all inheriting mappers as well. This is to maintain proper insert ordering among a polymorphic chain of instances. Therefore _save_obj is typically called only on a *base mapper*, or a mapper which does not inherit from any other mapper. """ if self.__should_log_debug: self.__log_debug("_save_obj() start, " + (single and "non-batched" or "batched")) # if batch=false, call _save_obj separately for each object if not single and not self.batch: for state in states: self._save_obj([state], uowtransaction, postupdate=postupdate, post_update_cols=post_update_cols, single=True) return # if session has a connection callable, # organize individual states with the connection to use for insert/update if 'connection_callable' in uowtransaction.mapper_flush_opts: connection_callable = uowtransaction.mapper_flush_opts['connection_callable'] tups = [(state, connection_callable(self, state.obj()), _state_has_identity(state)) for state in states] else: connection = uowtransaction.transaction.connection(self) tups = [(state, connection, _state_has_identity(state)) for state in states] if not postupdate: # call before_XXX extensions for state, connection, has_identity in tups: mapper = _state_mapper(state) if not has_identity: if 'before_insert' in mapper.extension.methods: mapper.extension.before_insert(mapper, connection, state.obj()) else: if 'before_update' in mapper.extension.methods: mapper.extension.before_update(mapper, connection, state.obj()) for state, connection, has_identity in tups: # detect if we have a "pending" instance (i.e. has no instance_key attached to it), # and another instance with the same identity key already exists as persistent. convert to an # UPDATE if so. mapper = _state_mapper(state) instance_key = mapper._identity_key_from_state(state) if not postupdate and not has_identity and instance_key in uowtransaction.uow.identity_map: existing = uowtransaction.uow.identity_map[instance_key]._state if not uowtransaction.is_deleted(existing): raise exceptions.FlushError("New instance %s with identity key %s conflicts with persistent instance %s" % (state_str(state), str(instance_key), state_str(existing))) if self.__should_log_debug: self.__log_debug("detected row switch for identity %s. will update %s, remove %s from transaction" % (instance_key, state_str(state), state_str(existing))) uowtransaction.set_row_switch(existing) inserted_objects = util.Set() updated_objects = util.Set() table_to_mapper = {} for mapper in self.base_mapper.polymorphic_iterator(): for t in mapper.tables: table_to_mapper[t] = mapper for table in sqlutil.sort_tables(table_to_mapper.keys()): insert = [] update = [] for state, connection, has_identity in tups: mapper = _state_mapper(state) if table not in mapper._pks_by_table: continue pks = mapper._pks_by_table[table] instance_key = mapper._identity_key_from_state(state) if self.__should_log_debug: self.__log_debug("_save_obj() table '%s' instance %s identity %s" % (table.name, state_str(state), str(instance_key))) isinsert = not instance_key in uowtransaction.uow.identity_map and not postupdate and not has_identity params = {} value_params = {} hasdata = False if isinsert: for col in mapper._cols_by_table[table]: if col is mapper.version_id_col: params[col.key] = 1 elif col in pks: value = mapper._get_state_attr_by_column(state, col) if value is not None: params[col.key] = value elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col): if self.__should_log_debug: self.__log_debug("Using polymorphic identity '%s' for insert column '%s'" % (mapper.polymorphic_identity, col.key)) value = mapper.polymorphic_identity if col.default is None or value is not None: params[col.key] = value else: value = mapper._get_state_attr_by_column(state, col) if col.default is None or value is not None: if isinstance(value, sql.ClauseElement): value_params[col] = value else: params[col.key] = value insert.append((state, params, mapper, connection, value_params)) else: for col in mapper._cols_by_table[table]: if col is mapper.version_id_col: params[col._label] = mapper._get_state_attr_by_column(state, col) params[col.key] = params[col._label] + 1 for prop in mapper._columntoproperty.values(): (added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True) if added: hasdata = True elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col): pass else: if post_update_cols is not None and col not in post_update_cols: if col in pks: params[col._label] = mapper._get_state_attr_by_column(state, col) continue prop = mapper._columntoproperty[col] (added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True) if added: if isinstance(added[0], sql.ClauseElement): value_params[col] = added[0] else: params[col.key] = prop.get_col_value(col, added[0]) if col in pks: if deleted: params[col._label] = deleted[0] else: # row switch logic can reach us here params[col._label] = added[0] hasdata = True elif col in pks: params[col._label] = mapper._get_state_attr_by_column(state, col) if hasdata: update.append((state, params, mapper, connection, value_params)) if update: mapper = table_to_mapper[table] clause = sql.and_() for col in mapper._pks_by_table[table]: clause.clauses.append(col == sql.bindparam(col._label, type_=col.type)) if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col): clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type)) statement = table.update(clause) pks = mapper._pks_by_table[table] def comparator(a, b): for col in pks: x = cmp(a[1][col._label],b[1][col._label]) if x != 0: return x return 0 update.sort(comparator) rows = 0 for rec in update: (state, params, mapper, connection, value_params) = rec c = connection.execute(statement.values(value_params), params) mapper._postfetch(uowtransaction, connection, table, state, c, c.last_updated_params(), value_params) # testlib.pragma exempt:__hash__ updated_objects.add((state, connection)) rows += c.rowcount if c.supports_sane_rowcount() and rows != len(update): raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (rows, len(update))) if insert: statement = table.insert() def comparator(a, b): return cmp(a[0].insert_order, b[0].insert_order) insert.sort(comparator) for rec in insert: (state, params, mapper, connection, value_params) = rec c = connection.execute(statement.values(value_params), params) primary_key = c.last_inserted_ids() if primary_key is not None: # set primary key attributes for i, col in enumerate(mapper._pks_by_table[table]): if mapper._get_state_attr_by_column(state, col) is None and len(primary_key) > i: mapper._set_state_attr_by_column(state, col, primary_key[i]) mapper._postfetch(uowtransaction, connection, table, state, c, c.last_inserted_params(), value_params) # synchronize newly inserted ids from one table to the next
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -