query.py
来自「SQLAlchemy. 经典的Python ORM框架。学习必看。」· Python 代码 · 共 1,484 行 · 第 1/4 页
PY
1,484 行
if criterion is not None and not isinstance(criterion, sql.ClauseElement): raise exceptions.ArgumentError("filter() argument must be of type sqlalchemy.sql.ClauseElement or string") if self._adapter is not None: criterion = self._adapter.traverse(criterion) q = self._no_statement("filter") if q._criterion is not None: q._criterion = q._criterion & criterion else: q._criterion = criterion return q def filter_by(self, **kwargs): """apply the given filtering criterion to the query and return the newly resulting ``Query``.""" clauses = [self._joinpoint.get_property(key, resolve_synonyms=True).compare(operators.eq, value) for key, value in kwargs.iteritems()] return self.filter(sql.and_(*clauses)) def _get_joinable_tables(self): if not self._joinable_tables or self._joinable_tables[0] is not self._from_obj: currenttables = [self._from_obj] def visit_join(join): currenttables.append(join.left) currenttables.append(join.right) visitors.traverse(self._from_obj, visit_join=visit_join, traverse_options={'column_collections':False, 'aliased_selectables':False}) self._joinable_tables = (self._from_obj, currenttables) return currenttables else: return self._joinable_tables[1] def _join_to(self, keys, outerjoin=False, start=None, create_aliases=True): if start is None: start = self._joinpoint clause = self._from_obj currenttables = self._get_joinable_tables() # determine if generated joins need to be aliased on the left # hand side. if self._adapter and not self._aliases: # at the beginning of a join, look at leftmost adapter adapt_against = self._adapter.selectable elif start.select_table is not start.mapped_table: # in the middle of a join, look for a polymorphic mapper adapt_against = start.select_table else: adapt_against = None mapper = start alias = self._aliases if not isinstance(keys, list): keys = [keys] for key in keys: use_selectable = None if isinstance(key, tuple): key, use_selectable = key if isinstance(key, interfaces.PropComparator): prop = key.property else: prop = mapper.get_property(key, resolve_synonyms=True) if use_selectable: if not use_selectable.is_derived_from(prop.mapper.mapped_table): raise exceptions.InvalidRequestError("Selectable '%s' is not derived from '%s'" % (use_selectable.description, prop.mapper.mapped_table.description)) if not isinstance(use_selectable, expression.Alias): use_selectable = use_selectable.alias() if prop._is_self_referential() and not create_aliases and not use_selectable: raise exceptions.InvalidRequestError("Self-referential query on '%s' property requires create_aliases=True argument." % str(prop)) if prop.select_table not in currenttables or create_aliases or use_selectable: if prop.secondary: if use_selectable: alias = mapperutil.PropertyAliasedClauses(prop, prop.primary_join_against(mapper, adapt_against), prop.secondary_join_against(mapper), alias, alias=use_selectable ) crit = alias.primaryjoin clause = clause.join(alias.secondary, crit, isouter=outerjoin).join(alias.alias, alias.secondaryjoin, isouter=outerjoin) elif create_aliases: alias = mapperutil.PropertyAliasedClauses(prop, prop.primary_join_against(mapper, adapt_against), prop.secondary_join_against(mapper), alias ) crit = alias.primaryjoin clause = clause.join(alias.secondary, crit, isouter=outerjoin).join(alias.alias, alias.secondaryjoin, isouter=outerjoin) else: crit = prop.primary_join_against(mapper, adapt_against) clause = clause.join(prop.secondary, crit, isouter=outerjoin) clause = clause.join(prop.select_table, prop.secondary_join_against(mapper), isouter=outerjoin) else: if use_selectable: alias = mapperutil.PropertyAliasedClauses(prop, prop.primary_join_against(mapper, adapt_against), None, alias, alias=use_selectable ) crit = alias.primaryjoin clause = clause.join(alias.alias, crit, isouter=outerjoin) elif create_aliases: alias = mapperutil.PropertyAliasedClauses(prop, prop.primary_join_against(mapper, adapt_against), None, alias ) crit = alias.primaryjoin clause = clause.join(alias.alias, crit, isouter=outerjoin) else: crit = prop.primary_join_against(mapper, adapt_against) clause = clause.join(prop.select_table, crit, isouter=outerjoin) elif not create_aliases and prop.secondary is not None and prop.secondary not in currenttables: # TODO: this check is not strong enough for different paths to the same endpoint which # does not use secondary tables raise exceptions.InvalidRequestError("Can't join to property '%s'; a path to this table along a different secondary table already exists. Use the `alias=True` argument to `join()`." % prop.key) mapper = prop.mapper if use_selectable: adapt_against = use_selectable elif mapper.select_table is not mapper.mapped_table: adapt_against = mapper.select_table return (clause, mapper, alias) def _generative_col_aggregate(self, col, func): """apply the given aggregate function to the query and return the newly resulting ``Query``. """ if self._column_aggregate is not None: raise exceptions.InvalidRequestError("Query already contains an aggregate column or function") q = self._no_statement("aggregate") q._column_aggregate = (col, func) return q def apply_min(self, col): """apply the SQL ``min()`` function against the given column to the query and return the newly resulting ``Query``. """ return self._generative_col_aggregate(col, sql.func.min) def apply_max(self, col): """apply the SQL ``max()`` function against the given column to the query and return the newly resulting ``Query``. """ return self._generative_col_aggregate(col, sql.func.max) def apply_sum(self, col): """apply the SQL ``sum()`` function against the given column to the query and return the newly resulting ``Query``. """ return self._generative_col_aggregate(col, sql.func.sum) def apply_avg(self, col): """apply the SQL ``avg()`` function against the given column to the query and return the newly resulting ``Query``. """ return self._generative_col_aggregate(col, sql.func.avg) def _col_aggregate(self, col, func): """Execute ``func()`` function against the given column. For performance, only use subselect if `order_by` attribute is set. """ ops = {'distinct':self._distinct, 'order_by':self._order_by or None, 'from_obj':self._from_obj} if self._autoflush and not self._populate_existing: self.session._autoflush() if self._order_by is not False: s1 = sql.select([col], self._criterion, **ops).alias('u') return self.session.execute(sql.select([func(s1.corresponding_column(col))]), mapper=self.mapper).scalar() else: return self.session.execute(sql.select([func(col)], self._criterion, **ops), mapper=self.mapper).scalar() def min(self, col): """Execute the SQL ``min()`` function against the given column.""" return self._col_aggregate(col, sql.func.min) def max(self, col): """Execute the SQL ``max()`` function against the given column.""" return self._col_aggregate(col, sql.func.max) def sum(self, col): """Execute the SQL ``sum()`` function against the given column.""" return self._col_aggregate(col, sql.func.sum) def avg(self, col): """Execute the SQL ``avg()`` function against the given column.""" return self._col_aggregate(col, sql.func.avg) def order_by(self, criterion): """apply one or more ORDER BY criterion to the query and return the newly resulting ``Query``""" q = self._no_statement("order_by") if self._adapter: criterion = [expression._literal_as_text(o) for o in util.to_list(criterion) or []] criterion = self._adapter.copy_and_process(criterion) if q._order_by is False: q._order_by = util.to_list(criterion) else: q._order_by = q._order_by + util.to_list(criterion) return q def group_by(self, criterion): """apply one or more GROUP BY criterion to the query and return the newly resulting ``Query``""" q = self._no_statement("group_by") if q._group_by is False: q._group_by = util.to_list(criterion) else: q._group_by = q._group_by + util.to_list(criterion) return q def having(self, criterion): """apply a HAVING criterion to the query and return the newly resulting ``Query``.""" if isinstance(criterion, basestring): criterion = sql.text(criterion) if criterion is not None and not isinstance(criterion, sql.ClauseElement): raise exceptions.ArgumentError("having() argument must be of type sqlalchemy.sql.ClauseElement or string") if self._adapter is not None: criterion = self._adapter.traverse(criterion) q = self._no_statement("having") if q._having is not None: q._having = q._having & criterion else: q._having = criterion return q def join(self, prop, id=None, aliased=False, from_joinpoint=False): """Create a join against this ``Query`` object's criterion and apply generatively, retunring the newly resulting ``Query``. 'prop' may be one of: * a string property name, i.e. "rooms" * a class-mapped attribute, i.e. Houses.rooms * a 2-tuple containing one of the above, combined with a selectable which derives from the properties' mapped table * a list (not a tuple) containing a combination of any of the above. e.g.:: session.query(Company).join('employees') session.query(Company).join(['employees', 'tasks']) session.query(Houses).join([Colonials.rooms, Room.closets]) session.query(Company).join([('employees', people.join(engineers)), Engineer.computers]) """ return self._join(prop, id=id, outerjoin=False, aliased=aliased, from_joinpoint=from_joinpoint) def outerjoin(self, prop, id=None, aliased=False, from_joinpoint=False): """Create a left outer join against this ``Query`` object's criterion and apply generatively, retunring the newly resulting ``Query``. 'prop' may be one of: * a string property name, i.e. "rooms" * a class-mapped attribute, i.e. Houses.rooms * a 2-tuple containing one of the above, combined with a selectable which derives from the properties' mapped table * a list (not a tuple) containing a combination of any of the above. e.g.:: session.query(Company).outerjoin('employees') session.query(Company).outerjoin(['employees', 'tasks']) session.query(Houses).outerjoin([Colonials.rooms, Room.closets]) session.query(Company).join([('employees', people.join(engineers)), Engineer.computers]) """ return self._join(prop, id=id, outerjoin=True, aliased=aliased, from_joinpoint=from_joinpoint) def _join(self, prop, id, outerjoin, aliased, from_joinpoint): (clause, mapper, aliases) = self._join_to(prop, outerjoin=outerjoin, start=from_joinpoint and self._joinpoint or self.mapper, create_aliases=aliased) q = self._no_statement("join") q._from_obj = clause q._joinpoint = mapper q._aliases = aliases if aliases: q._adapter = sql_util.ClauseAdapter(aliases.alias).copy_and_chain(q._adapter) else: select_mapper = mapper.get_select_mapper() if select_mapper._clause_adapter: q._adapter = select_mapper._clause_adapter.copy_and_chain(q._adapter) a = aliases while a is not None: q._alias_ids.setdefault(a.mapper, []).append(a) q._alias_ids.setdefault(a.table, []).append(a) q._alias_ids.setdefault(a.alias, []).append(a) a = a.parentclauses if id: q._alias_ids[id] = aliases return q def reset_joinpoint(self): """return a new Query reset the 'joinpoint' of this Query reset back to the starting mapper. Subsequent generative calls will be constructed from the new joinpoint. Note that each call to join() or outerjoin() also starts from the root. """ q = self._no_statement("reset_joinpoint") q._joinpoint = q.mapper q._aliases = None if q.table not in q._get_joinable_tables(): q._adapter = sql_util.ClauseAdapter(q._from_obj, equivalents=q.mapper._equivalent_columns) return q def select_from(self, from_obj): """Set the `from_obj` parameter of the query and return the newly resulting ``Query``. This replaces the table which this Query selects from with the given table. `from_obj` is a single table or selectable. """ new = self._no_criterion('select_from') if isinstance(from_obj, (tuple, list)): util.warn_deprecated("select_from() now accepts a single Selectable as its argument, which replaces any existing FROM criterion.") from_obj = from_obj[-1] if isinstance(from_obj, expression._SelectBaseMixin): # alias SELECTs and unions from_obj = from_obj.alias() new._from_obj = from_obj if new.table not in new._get_joinable_tables(): new._adapter = sql_util.ClauseAdapter(new._from_obj, equivalents=new.mapper._equivalent_columns) return new def __getitem__(self, item): if isinstance(item, slice): start = item.start stop = item.stop # if we slice from the end we need to execute the query if (isinstance(start, int) and start < 0) or \ (isinstance(stop, int) and stop < 0): return list(self)[item] else: res = self._clone() if start is not None and stop is not None: res._offset = (self._offset or 0) + start res._limit = stop - start elif start is None and stop is not None: res._limit = stop
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?