query.py

来自「SQLAlchemy. 经典的Python ORM框架。学习必看。」· Python 代码 · 共 1,484 行 · 第 1/4 页

PY
1,484
字号
        if criterion is not None and not isinstance(criterion, sql.ClauseElement):            raise exceptions.ArgumentError("filter() argument must be of type sqlalchemy.sql.ClauseElement or string")        if self._adapter is not None:            criterion = self._adapter.traverse(criterion)        q = self._no_statement("filter")        if q._criterion is not None:            q._criterion = q._criterion & criterion        else:            q._criterion = criterion        return q    def filter_by(self, **kwargs):        """apply the given filtering criterion to the query and return the newly resulting ``Query``."""        clauses = [self._joinpoint.get_property(key, resolve_synonyms=True).compare(operators.eq, value)            for key, value in kwargs.iteritems()]        return self.filter(sql.and_(*clauses))    def _get_joinable_tables(self):        if not self._joinable_tables or self._joinable_tables[0] is not self._from_obj:            currenttables = [self._from_obj]            def visit_join(join):                currenttables.append(join.left)                currenttables.append(join.right)            visitors.traverse(self._from_obj, visit_join=visit_join, traverse_options={'column_collections':False, 'aliased_selectables':False})            self._joinable_tables = (self._from_obj, currenttables)            return currenttables        else:            return self._joinable_tables[1]    def _join_to(self, keys, outerjoin=False, start=None, create_aliases=True):        if start is None:            start = self._joinpoint        clause = self._from_obj        currenttables = self._get_joinable_tables()        # determine if generated joins need to be aliased on the left        # hand side.        if self._adapter and not self._aliases:  # at the beginning of a join, look at leftmost adapter            adapt_against = self._adapter.selectable        elif start.select_table is not start.mapped_table: # in the middle of a join, look for a polymorphic mapper            adapt_against = start.select_table        else:            adapt_against = None        mapper = start        alias = self._aliases        if not isinstance(keys, list):            keys = [keys]        for key in keys:            use_selectable = None            if isinstance(key, tuple):                key, use_selectable = key            if isinstance(key, interfaces.PropComparator):                prop = key.property            else:                prop = mapper.get_property(key, resolve_synonyms=True)            if use_selectable:                if not use_selectable.is_derived_from(prop.mapper.mapped_table):                    raise exceptions.InvalidRequestError("Selectable '%s' is not derived from '%s'" % (use_selectable.description, prop.mapper.mapped_table.description))                if not isinstance(use_selectable, expression.Alias):                    use_selectable = use_selectable.alias()            if prop._is_self_referential() and not create_aliases and not use_selectable:                raise exceptions.InvalidRequestError("Self-referential query on '%s' property requires create_aliases=True argument." % str(prop))            if prop.select_table not in currenttables or create_aliases or use_selectable:                if prop.secondary:                    if use_selectable:                        alias = mapperutil.PropertyAliasedClauses(prop,                            prop.primary_join_against(mapper, adapt_against),                            prop.secondary_join_against(mapper),                            alias,                            alias=use_selectable                        )                        crit = alias.primaryjoin                        clause = clause.join(alias.secondary, crit, isouter=outerjoin).join(alias.alias, alias.secondaryjoin, isouter=outerjoin)                    elif create_aliases:                        alias = mapperutil.PropertyAliasedClauses(prop,                            prop.primary_join_against(mapper, adapt_against),                            prop.secondary_join_against(mapper),                            alias                        )                        crit = alias.primaryjoin                        clause = clause.join(alias.secondary, crit, isouter=outerjoin).join(alias.alias, alias.secondaryjoin, isouter=outerjoin)                    else:                        crit = prop.primary_join_against(mapper, adapt_against)                        clause = clause.join(prop.secondary, crit, isouter=outerjoin)                        clause = clause.join(prop.select_table, prop.secondary_join_against(mapper), isouter=outerjoin)                else:                    if use_selectable:                        alias = mapperutil.PropertyAliasedClauses(prop,                            prop.primary_join_against(mapper, adapt_against),                            None,                            alias,                            alias=use_selectable                        )                        crit = alias.primaryjoin                        clause = clause.join(alias.alias, crit, isouter=outerjoin)                    elif create_aliases:                        alias = mapperutil.PropertyAliasedClauses(prop,                            prop.primary_join_against(mapper, adapt_against),                            None,                            alias                        )                        crit = alias.primaryjoin                        clause = clause.join(alias.alias, crit, isouter=outerjoin)                    else:                        crit = prop.primary_join_against(mapper, adapt_against)                        clause = clause.join(prop.select_table, crit, isouter=outerjoin)            elif not create_aliases and prop.secondary is not None and prop.secondary not in currenttables:                # TODO: this check is not strong enough for different paths to the same endpoint which                # does not use secondary tables                raise exceptions.InvalidRequestError("Can't join to property '%s'; a path to this table along a different secondary table already exists.  Use the `alias=True` argument to `join()`." % prop.key)            mapper = prop.mapper            if use_selectable:                adapt_against = use_selectable            elif mapper.select_table is not mapper.mapped_table:                adapt_against = mapper.select_table        return (clause, mapper, alias)    def _generative_col_aggregate(self, col, func):        """apply the given aggregate function to the query and return the newly        resulting ``Query``.        """        if self._column_aggregate is not None:            raise exceptions.InvalidRequestError("Query already contains an aggregate column or function")        q = self._no_statement("aggregate")        q._column_aggregate = (col, func)        return q    def apply_min(self, col):        """apply the SQL ``min()`` function against the given column to the        query and return the newly resulting ``Query``.        """        return self._generative_col_aggregate(col, sql.func.min)    def apply_max(self, col):        """apply the SQL ``max()`` function against the given column to the        query and return the newly resulting ``Query``.        """        return self._generative_col_aggregate(col, sql.func.max)    def apply_sum(self, col):        """apply the SQL ``sum()`` function against the given column to the        query and return the newly resulting ``Query``.        """        return self._generative_col_aggregate(col, sql.func.sum)    def apply_avg(self, col):        """apply the SQL ``avg()`` function against the given column to the        query and return the newly resulting ``Query``.        """        return self._generative_col_aggregate(col, sql.func.avg)    def _col_aggregate(self, col, func):        """Execute ``func()`` function against the given column.        For performance, only use subselect if `order_by` attribute is set.        """        ops = {'distinct':self._distinct, 'order_by':self._order_by or None, 'from_obj':self._from_obj}        if self._autoflush and not self._populate_existing:            self.session._autoflush()        if self._order_by is not False:            s1 = sql.select([col], self._criterion, **ops).alias('u')            return self.session.execute(sql.select([func(s1.corresponding_column(col))]), mapper=self.mapper).scalar()        else:            return self.session.execute(sql.select([func(col)], self._criterion, **ops), mapper=self.mapper).scalar()    def min(self, col):        """Execute the SQL ``min()`` function against the given column."""        return self._col_aggregate(col, sql.func.min)    def max(self, col):        """Execute the SQL ``max()`` function against the given column."""        return self._col_aggregate(col, sql.func.max)    def sum(self, col):        """Execute the SQL ``sum()`` function against the given column."""        return self._col_aggregate(col, sql.func.sum)    def avg(self, col):        """Execute the SQL ``avg()`` function against the given column."""        return self._col_aggregate(col, sql.func.avg)    def order_by(self, criterion):        """apply one or more ORDER BY criterion to the query and return the newly resulting ``Query``"""        q = self._no_statement("order_by")        if self._adapter:            criterion = [expression._literal_as_text(o) for o in util.to_list(criterion) or []]            criterion = self._adapter.copy_and_process(criterion)        if q._order_by is False:            q._order_by = util.to_list(criterion)        else:            q._order_by = q._order_by + util.to_list(criterion)        return q    def group_by(self, criterion):        """apply one or more GROUP BY criterion to the query and return the newly resulting ``Query``"""        q = self._no_statement("group_by")        if q._group_by is False:            q._group_by = util.to_list(criterion)        else:            q._group_by = q._group_by + util.to_list(criterion)        return q    def having(self, criterion):        """apply a HAVING criterion to the query and return the newly resulting ``Query``."""        if isinstance(criterion, basestring):            criterion = sql.text(criterion)        if criterion is not None and not isinstance(criterion, sql.ClauseElement):            raise exceptions.ArgumentError("having() argument must be of type sqlalchemy.sql.ClauseElement or string")        if self._adapter is not None:            criterion = self._adapter.traverse(criterion)        q = self._no_statement("having")        if q._having is not None:            q._having = q._having & criterion        else:            q._having = criterion        return q    def join(self, prop, id=None, aliased=False, from_joinpoint=False):        """Create a join against this ``Query`` object's criterion        and apply generatively, retunring the newly resulting ``Query``.        'prop' may be one of:          * a string property name, i.e. "rooms"          * a class-mapped attribute, i.e. Houses.rooms          * a 2-tuple containing one of the above, combined with a selectable            which derives from the properties' mapped table          * a list (not a tuple) containing a combination of any of the above.        e.g.::            session.query(Company).join('employees')            session.query(Company).join(['employees', 'tasks'])            session.query(Houses).join([Colonials.rooms, Room.closets])            session.query(Company).join([('employees', people.join(engineers)), Engineer.computers])        """        return self._join(prop, id=id, outerjoin=False, aliased=aliased, from_joinpoint=from_joinpoint)    def outerjoin(self, prop, id=None, aliased=False, from_joinpoint=False):        """Create a left outer join against this ``Query`` object's criterion        and apply generatively, retunring the newly resulting ``Query``.        'prop' may be one of:          * a string property name, i.e. "rooms"          * a class-mapped attribute, i.e. Houses.rooms          * a 2-tuple containing one of the above, combined with a selectable            which derives from the properties' mapped table          * a list (not a tuple) containing a combination of any of the above.        e.g.::            session.query(Company).outerjoin('employees')            session.query(Company).outerjoin(['employees', 'tasks'])            session.query(Houses).outerjoin([Colonials.rooms, Room.closets])            session.query(Company).join([('employees', people.join(engineers)), Engineer.computers])        """        return self._join(prop, id=id, outerjoin=True, aliased=aliased, from_joinpoint=from_joinpoint)    def _join(self, prop, id, outerjoin, aliased, from_joinpoint):        (clause, mapper, aliases) = self._join_to(prop, outerjoin=outerjoin, start=from_joinpoint and self._joinpoint or self.mapper, create_aliases=aliased)        q = self._no_statement("join")        q._from_obj = clause        q._joinpoint = mapper        q._aliases = aliases        if aliases:            q._adapter = sql_util.ClauseAdapter(aliases.alias).copy_and_chain(q._adapter)        else:            select_mapper = mapper.get_select_mapper()            if select_mapper._clause_adapter:                q._adapter = select_mapper._clause_adapter.copy_and_chain(q._adapter)        a = aliases        while a is not None:            q._alias_ids.setdefault(a.mapper, []).append(a)            q._alias_ids.setdefault(a.table, []).append(a)            q._alias_ids.setdefault(a.alias, []).append(a)            a = a.parentclauses        if id:            q._alias_ids[id] = aliases        return q    def reset_joinpoint(self):        """return a new Query reset the 'joinpoint' of this Query reset        back to the starting mapper.  Subsequent generative calls will        be constructed from the new joinpoint.        Note that each call to join() or outerjoin() also starts from        the root.        """        q = self._no_statement("reset_joinpoint")        q._joinpoint = q.mapper        q._aliases = None        if q.table not in q._get_joinable_tables():            q._adapter = sql_util.ClauseAdapter(q._from_obj, equivalents=q.mapper._equivalent_columns)        return q    def select_from(self, from_obj):        """Set the `from_obj` parameter of the query and return the newly        resulting ``Query``.  This replaces the table which this Query selects        from with the given table.        `from_obj` is a single table or selectable.        """        new = self._no_criterion('select_from')        if isinstance(from_obj, (tuple, list)):            util.warn_deprecated("select_from() now accepts a single Selectable as its argument, which replaces any existing FROM criterion.")            from_obj = from_obj[-1]        if isinstance(from_obj, expression._SelectBaseMixin):            # alias SELECTs and unions            from_obj = from_obj.alias()        new._from_obj = from_obj        if new.table not in new._get_joinable_tables():            new._adapter = sql_util.ClauseAdapter(new._from_obj, equivalents=new.mapper._equivalent_columns)        return new    def __getitem__(self, item):        if isinstance(item, slice):            start = item.start            stop = item.stop            # if we slice from the end we need to execute the query            if (isinstance(start, int) and start < 0) or \               (isinstance(stop, int) and stop < 0):                return list(self)[item]            else:                res = self._clone()                if start is not None and stop is not None:                    res._offset = (self._offset or 0) + start                    res._limit = stop - start                elif start is None and stop is not None:                    res._limit = stop

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?