⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapper.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
        raise exceptions.InvalidRequestError("No contextual Session is established.")    def instances(self, cursor, session, *mappers, **kwargs):        """Return a list of mapped instances corresponding to the rows        in a given ResultProxy.        DEPRECATED.        """        import sqlalchemy.orm.query        return sqlalchemy.orm.Query(self, session).instances(cursor, *mappers, **kwargs)    instances = util.deprecated(instances, add_deprecation_to_docstring=False)    def identity_key_from_row(self, row):        """Return an identity-map key for use in storing/retrieving an        item from the identity map.        row          A ``sqlalchemy.engine.base.RowProxy`` instance or a          dictionary corresponding result-set ``ColumnElement``          instances to their values within a row.        """        return (self._identity_class, tuple([row[column] for column in self.primary_key]), self.entity_name)    def identity_key_from_primary_key(self, primary_key):        """Return an identity-map key for use in storing/retrieving an        item from an identity map.        primary_key          A list of values indicating the identifier.        """        return (self._identity_class, tuple(util.to_list(primary_key)), self.entity_name)    def identity_key_from_instance(self, instance):        """Return the identity key for the given instance, based on        its primary key attributes.        This value is typically also found on the instance itself        under the attribute name `_instance_key`.        """        return self.identity_key_from_primary_key(self.primary_key_from_instance(instance))    def _identity_key_from_state(self, state):        return self.identity_key_from_primary_key(self._primary_key_from_state(state))    def primary_key_from_instance(self, instance):        """Return the list of primary key values for the given        instance.        """        return [self._get_state_attr_by_column(instance._state, column) for column in self.primary_key]    def _primary_key_from_state(self, state):        return [self._get_state_attr_by_column(state, column) for column in self.primary_key]    def _canload(self, state):        if self.polymorphic_on is not None:            return issubclass(state.class_, self.class_)        else:            return state.class_ is self.class_    def _get_col_to_prop(self, column):        try:            return self._columntoproperty[column]        except KeyError:            prop = self.__props.get(column.key, None)            if prop:                raise exceptions.UnmappedColumnError("Column '%s.%s' is not available, due to conflicting property '%s':%s" % (column.table.name, column.name, column.key, repr(prop)))            else:                raise exceptions.UnmappedColumnError("No column %s.%s is configured on mapper %s..." % (column.table.name, column.name, str(self)))    def _get_state_attr_by_column(self, state, column):        return self._get_col_to_prop(column).getattr(state, column)    def _set_state_attr_by_column(self, state, column, value):        return self._get_col_to_prop(column).setattr(state, value, column)    def _get_attr_by_column(self, obj, column):        return self._get_col_to_prop(column).getattr(obj._state, column)    def _get_committed_attr_by_column(self, obj, column):        return self._get_col_to_prop(column).getcommitted(obj._state, column)    def _set_attr_by_column(self, obj, column, value):        self._get_col_to_prop(column).setattr(obj._state, column, value)    def _save_obj(self, states, uowtransaction, postupdate=False, post_update_cols=None, single=False):        """Issue ``INSERT`` and/or ``UPDATE`` statements for a list of objects.        This is called within the context of a UOWTransaction during a        flush operation.        `_save_obj` issues SQL statements not just for instances mapped        directly by this mapper, but for instances mapped by all        inheriting mappers as well.  This is to maintain proper insert        ordering among a polymorphic chain of instances. Therefore        _save_obj is typically called only on a *base mapper*, or a        mapper which does not inherit from any other mapper.        """        if self.__should_log_debug:            self.__log_debug("_save_obj() start, " + (single and "non-batched" or "batched"))        # if batch=false, call _save_obj separately for each object        if not single and not self.batch:            for state in states:                self._save_obj([state], uowtransaction, postupdate=postupdate, post_update_cols=post_update_cols, single=True)            return        # if session has a connection callable,        # organize individual states with the connection to use for insert/update        if 'connection_callable' in uowtransaction.mapper_flush_opts:            connection_callable = uowtransaction.mapper_flush_opts['connection_callable']            tups = [(state, connection_callable(self, state.obj()), _state_has_identity(state)) for state in states]        else:            connection = uowtransaction.transaction.connection(self)            tups = [(state, connection, _state_has_identity(state)) for state in states]        if not postupdate:            # call before_XXX extensions            for state, connection, has_identity in tups:                mapper = _state_mapper(state)                if not has_identity:                    if 'before_insert' in mapper.extension.methods:                        mapper.extension.before_insert(mapper, connection, state.obj())                else:                    if 'before_update' in mapper.extension.methods:                        mapper.extension.before_update(mapper, connection, state.obj())        for state, connection, has_identity in tups:            # detect if we have a "pending" instance (i.e. has no instance_key attached to it),            # and another instance with the same identity key already exists as persistent.  convert to an            # UPDATE if so.            mapper = _state_mapper(state)            instance_key = mapper._identity_key_from_state(state)            if not postupdate and not has_identity and instance_key in uowtransaction.uow.identity_map:                existing = uowtransaction.uow.identity_map[instance_key]._state                if not uowtransaction.is_deleted(existing):                    raise exceptions.FlushError("New instance %s with identity key %s conflicts with persistent instance %s" % (state_str(state), str(instance_key), state_str(existing)))                if self.__should_log_debug:                    self.__log_debug("detected row switch for identity %s.  will update %s, remove %s from transaction" % (instance_key, state_str(state), state_str(existing)))                uowtransaction.set_row_switch(existing)        inserted_objects = util.Set()        updated_objects = util.Set()        table_to_mapper = {}        for mapper in self.base_mapper.polymorphic_iterator():            for t in mapper.tables:                table_to_mapper[t] = mapper        for table in sqlutil.sort_tables(table_to_mapper.keys()):            insert = []            update = []            for state, connection, has_identity in tups:                mapper = _state_mapper(state)                if table not in mapper._pks_by_table:                    continue                pks = mapper._pks_by_table[table]                instance_key = mapper._identity_key_from_state(state)                if self.__should_log_debug:                    self.__log_debug("_save_obj() table '%s' instance %s identity %s" % (table.name, state_str(state), str(instance_key)))                isinsert = not instance_key in uowtransaction.uow.identity_map and not postupdate and not has_identity                params = {}                value_params = {}                hasdata = False                if isinsert:                    for col in mapper._cols_by_table[table]:                        if col is mapper.version_id_col:                            params[col.key] = 1                        elif col in pks:                            value = mapper._get_state_attr_by_column(state, col)                            if value is not None:                                params[col.key] = value                        elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col):                            if self.__should_log_debug:                                self.__log_debug("Using polymorphic identity '%s' for insert column '%s'" % (mapper.polymorphic_identity, col.key))                            value = mapper.polymorphic_identity                            if col.default is None or value is not None:                                params[col.key] = value                        else:                            value = mapper._get_state_attr_by_column(state, col)                            if col.default is None or value is not None:                                if isinstance(value, sql.ClauseElement):                                    value_params[col] = value                                else:                                    params[col.key] = value                    insert.append((state, params, mapper, connection, value_params))                else:                    for col in mapper._cols_by_table[table]:                        if col is mapper.version_id_col:                            params[col._label] = mapper._get_state_attr_by_column(state, col)                            params[col.key] = params[col._label] + 1                            for prop in mapper._columntoproperty.values():                                (added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True)                                if added:                                    hasdata = True                        elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col):                            pass                        else:                            if post_update_cols is not None and col not in post_update_cols:                                if col in pks:                                    params[col._label] = mapper._get_state_attr_by_column(state, col)                                continue                            prop = mapper._columntoproperty[col]                            (added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True)                            if added:                                if isinstance(added[0], sql.ClauseElement):                                    value_params[col] = added[0]                                else:                                    params[col.key] = prop.get_col_value(col, added[0])                                if col in pks:                                    if deleted:                                        params[col._label] = deleted[0]                                    else:                                        # row switch logic can reach us here                                        params[col._label] = added[0]                                hasdata = True                            elif col in pks:                                params[col._label] = mapper._get_state_attr_by_column(state, col)                    if hasdata:                        update.append((state, params, mapper, connection, value_params))            if update:                mapper = table_to_mapper[table]                clause = sql.and_()                for col in mapper._pks_by_table[table]:                    clause.clauses.append(col == sql.bindparam(col._label, type_=col.type))                if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):                    clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type))                statement = table.update(clause)                pks = mapper._pks_by_table[table]                def comparator(a, b):                    for col in pks:                        x = cmp(a[1][col._label],b[1][col._label])                        if x != 0:                            return x                    return 0                update.sort(comparator)                rows = 0                for rec in update:                    (state, params, mapper, connection, value_params) = rec                    c = connection.execute(statement.values(value_params), params)                    mapper._postfetch(uowtransaction, connection, table, state, c, c.last_updated_params(), value_params)                    # testlib.pragma exempt:__hash__                    updated_objects.add((state, connection))                    rows += c.rowcount                if c.supports_sane_rowcount() and rows != len(update):                    raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (rows, len(update)))            if insert:                statement = table.insert()                def comparator(a, b):                    return cmp(a[0].insert_order, b[0].insert_order)                insert.sort(comparator)                for rec in insert:                    (state, params, mapper, connection, value_params) = rec                    c = connection.execute(statement.values(value_params), params)                    primary_key = c.last_inserted_ids()                    if primary_key is not None:                        # set primary key attributes                        for i, col in enumerate(mapper._pks_by_table[table]):                            if mapper._get_state_attr_by_column(state, col) is None and len(primary_key) > i:                                mapper._set_state_attr_by_column(state, col, primary_key[i])                    mapper._postfetch(uowtransaction, connection, table, state, c, c.last_inserted_params(), value_params)                    # synchronize newly inserted ids from one table to the next

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -