📄 query.py
字号:
elif start is not None and stop is None: res._offset = (self._offset or 0) + start if item.step is not None: return list(res)[None:None:item.step] else: return res else: return list(self[item:item+1])[0] def limit(self, limit): """Apply a ``LIMIT`` to the query and return the newly resulting ``Query``. """ return self[:limit] def offset(self, offset): """Apply an ``OFFSET`` to the query and return the newly resulting ``Query``. """ return self[offset:] def distinct(self): """Apply a ``DISTINCT`` to the query and return the newly resulting ``Query``. """ new = self._no_statement("distinct") new._distinct = True return new def all(self): """Return the results represented by this ``Query`` as a list. This results in an execution of the underlying query. """ return list(self) def from_statement(self, statement): """Execute the given SELECT statement and return results. This method bypasses all internal statement compilation, and the statement is executed without modification. The statement argument is either a string, a ``select()`` construct, or a ``text()`` construct, and should return the set of columns appropriate to the entity class represented by this ``Query``. Also see the ``instances()`` method. """ if isinstance(statement, basestring): statement = sql.text(statement) q = self._no_criterion('from_statement') q._statement = statement return q def first(self): """Return the first result of this ``Query`` or None if the result doesn't contain any row. This results in an execution of the underlying query. """ if self._column_aggregate is not None: return self._col_aggregate(*self._column_aggregate) ret = list(self[0:1]) if len(ret) > 0: return ret[0] else: return None def one(self): """Return the first result of this ``Query``, raising an exception if more than one row exists. This results in an execution of the underlying query. """ if self._column_aggregate is not None: return self._col_aggregate(*self._column_aggregate) ret = list(self[0:2]) if len(ret) == 1: return ret[0] elif len(ret) == 0: raise exceptions.InvalidRequestError('No rows returned for one()') else: raise exceptions.InvalidRequestError('Multiple rows returned for one()') def __iter__(self): context = self._compile_context() context.statement.use_labels = True if self._autoflush and not self._populate_existing: self.session._autoflush() return self._execute_and_instances(context) def _execute_and_instances(self, querycontext): result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance) return self.iterate_instances(result, querycontext=querycontext) def instances(self, cursor, *mappers_or_columns, **kwargs): return list(self.iterate_instances(cursor, *mappers_or_columns, **kwargs)) def iterate_instances(self, cursor, *mappers_or_columns, **kwargs): session = self.session context = kwargs.pop('querycontext', None) if context is None: context = QueryContext(self) context.runid = _new_runid() mappers_or_columns = tuple(self._entities) + mappers_or_columns tuples = bool(mappers_or_columns) if context.row_adapter: def main(context, row): return self.select_mapper._instance(context, context.row_adapter(row), None, extension=context.extension, only_load_props=context.only_load_props, refresh_instance=context.refresh_instance ) else: def main(context, row): return self.select_mapper._instance(context, row, None, extension=context.extension, only_load_props=context.only_load_props, refresh_instance=context.refresh_instance ) if tuples: process = [] process.append(main) for tup in mappers_or_columns: if isinstance(tup, tuple): (m, alias, alias_id) = tup clauses = self._get_entity_clauses(tup) else: clauses = alias = alias_id = None m = tup if isinstance(m, type): m = mapper.class_mapper(m) if isinstance(m, mapper.Mapper): def x(m): row_adapter = clauses is not None and clauses.row_decorator or (lambda row: row) def proc(context, row): return m._instance(context, row_adapter(row), None) process.append(proc) x(m) elif isinstance(m, (sql.ColumnElement, basestring)): def y(m): row_adapter = clauses is not None and clauses.row_decorator or (lambda row: row) def proc(context, row): return row_adapter(row)[m] process.append(proc) y(m) else: raise exceptions.InvalidRequestError("Invalid column expression '%r'" % m) while True: context.progress = util.Set() context.partials = {} if self._yield_per: fetch = cursor.fetchmany(self._yield_per) if not fetch: return else: fetch = cursor.fetchall() if tuples: rows = util.OrderedSet() for row in fetch: rows.add(tuple([proc(context, row) for proc in process])) else: rows = util.UniqueAppender([]) for row in fetch: rows.append(main(context, row)) if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress: context.refresh_instance.commit(context.only_load_props) context.progress.remove(context.refresh_instance) for ii in context.progress: context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii) ii.commit_all() for ii, attrs in context.partials.items(): context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii, only_load_props=attrs) ii.commit(attrs) for row in rows: yield row if not self._yield_per: break def _get(self, key=None, ident=None, refresh_instance=None, lockmode=None, only_load_props=None): lockmode = lockmode or self._lockmode if not self._populate_existing and not refresh_instance and not self.mapper.always_refresh and lockmode is None: try: return self.session.identity_map[key] except KeyError: pass if ident is None: if key is not None: ident = key[1] else: ident = util.to_list(ident) q = self if ident is not None: q = q._no_criterion('get') params = {} (_get_clause, _get_params) = self.select_mapper._get_clause q = q.filter(_get_clause) for i, primary_key in enumerate(self.primary_key_columns): try: params[_get_params[primary_key].key] = ident[i] except IndexError: raise exceptions.InvalidRequestError("Could not find enough values to formulate primary key for query.get(); primary key columns are %s" % ', '.join(["'%s'" % str(c) for c in self.primary_key_columns])) q = q.params(params) if lockmode is not None: q = q.with_lockmode(lockmode) q = q._select_context_options(populate_existing=bool(refresh_instance), version_check=(lockmode is not None), only_load_props=only_load_props, refresh_instance=refresh_instance) q._order_by = None try: # call using all() to avoid LIMIT compilation complexity return q.all()[0] except IndexError: return None def _nestable(self, **kwargs): """Return true if the given statement options imply it should be nested.""" return (kwargs.get('limit') is not None or kwargs.get('offset') is not None or kwargs.get('distinct', False)) def count(self, whereclause=None, params=None, **kwargs): """Apply this query's criterion to a SELECT COUNT statement. the whereclause, params and \**kwargs arguments are deprecated. use filter() and other generative methods to establish modifiers. """ q = self if whereclause is not None: q = q.filter(whereclause) if params is not None: q = q.params(params) q = q._legacy_select_kwargs(**kwargs) return q._count() def _count(self): """Apply this query's criterion to a SELECT COUNT statement. this is the purely generative version which will become the public method in version 0.5. """ whereclause = self._criterion context = QueryContext(self) from_obj = self._from_obj if self._nestable(**self._select_args()): s = sql.select([self.table], whereclause, from_obj=from_obj, **self._select_args()).alias('getcount').count() else: primary_key = self.primary_key_columns s = sql.select([sql.func.count(list(primary_key)[0])], whereclause, from_obj=from_obj, **self._select_args()) if self._autoflush and not self._populate_existing: self.session._autoflush() return self.session.scalar(s, params=self._params, mapper=self.mapper) def compile(self): """compiles and returns a SQL statement based on the criterion and conditions within this Query.""" return self._compile_context().statement def _compile_context(self): context = QueryContext(self) if self._statement: self._statement.use_labels = True context.statement = self._statement return context whereclause = self._criterion from_obj = self._from_obj # if the query's ClauseAdapter is present, and its # specifically adapting against a modified "select_from" # argument, apply adaptation to the # individually selected columns as well as "eager" clauses added; # otherwise its currently not needed if self._adapter and self.table not in self._get_joinable_tables(): adapter = self._adapter else: adapter = None adapter = self._adapter # TODO: mappers added via add_entity(), adapt their queries also, # if those mappers are polymorphic order_by = self._order_by if order_by is False: order_by = self.select_mapper.order_by if order_by is False: order_by = [] if self.table.default_order_by() is not None: order_by = self.table.default_order_by() if from_obj.default_order_by() is not None: order_by = from_obj.default_order_by() try: for_update = {'read':'read','update':True,'update_nowait':'nowait',None:False}[self._lockmode] except KeyError: raise exceptions.ArgumentError("Unknown lockmode '%s'" % self._lockmode) # if single-table inheritance mapper, add "typecol IN (polymorphic)" criterion so # that we only load the appropriate types if self.select_mapper.single and self.select_mapper.polymorphic_on is not None and self.select_mapper.polymorphic_identity is not None: whereclause = sql.and_(whereclause, self.select_mapper.polymorphic_on.in_([m.polymorphic_identity for m in self.select_mapper.polymorphic_iterator()])) context.from_clause = from_obj # give all the attached properties a chance to modify the query # TODO: doing this off the select_mapper. if its the polymorphic mapper, then # it has no relations() on it. should we compile those too into the query ? (i.e. eagerloads) for value in self.select_mapper.iterate_properties: if self._only_load_props and value.key not in self._only_load_props: continue context.exec_with_path(self.select_mapper, value.key, value.setup, context, only_load_props=self._only_load_props) # additional entities/columns, add those to selection criterion for tup in self._entities: (m, alias, alias_id) = tup clauses = self._get_entity_clauses(tup) if isinstance(m, mapper.Mapper): for value in m.iterate_properties: context.exec_with_path(m, value.key, value.setup, context, parentclauses=clauses) elif isinstance(m, sql.ColumnElement): if clauses is not None: m = clauses.aliased_column(m) context.secondary_columns.append(m) if self._eager_loaders and self._nestable(**self._select_args()): # eager loaders are present, and the SELECT has limiting criterion # produce a "wrapped" selectable. # ensure all 'order by' elements are ClauseElement instances # (since they will potentially be aliased) # locate all embedded Column clauses so they can be added to the # "inner" select statement where they'll be available to the enclosing # statement's "order by" cf = util.Set() if order_by: order_by = [expression._literal_as_text(o) for o in util.to_list(order_by) or []] for o in order_by: cf.update(sql_util.find_columns(o)) if adapter: # TODO: make usage of the ClauseAdapter here to create the list # of primary columns context.primary_columns = [from_obj.corresponding_column(c) or c for c in context.primary_columns]
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -