📄 query.py
字号:
cf = [from_obj.corresponding_column(c) or c for c in cf] s2 = sql.select(context.primary_columns + list(cf), whereclause, from_obj=context.from_clause, use_labels=True, correlate=False, order_by=util.to_list(order_by), **self._select_args()) s3 = s2.alias() context.row_adapter = mapperutil.create_row_adapter(s3, self.table) statement = sql.select([s3] + context.secondary_columns, for_update=for_update, use_labels=True) if context.eager_joins: eager_joins = sql_util.ClauseAdapter(s3).traverse(context.eager_joins) statement.append_from(eager_joins, _copy_collection=False) if order_by: statement.append_order_by(*sql_util.ClauseAdapter(s3).copy_and_process(order_by)) statement.append_order_by(*context.eager_order_by) else: if adapter: # TODO: make usage of the ClauseAdapter here to create row adapter, list # of primary columns context.primary_columns = [from_obj.corresponding_column(c) or c for c in context.primary_columns] context.row_adapter = mapperutil.create_row_adapter(from_obj, self.table) if self._distinct: if order_by: order_by = [expression._literal_as_text(o) for o in util.to_list(order_by) or []] if self._distinct and order_by: cf = util.Set() for o in order_by: cf.update(sql_util.find_columns(o)) for c in cf: context.primary_columns.append(c) statement = sql.select(context.primary_columns + context.secondary_columns, whereclause, from_obj=from_obj, use_labels=True, for_update=for_update, order_by=util.to_list(order_by), **self._select_args()) if context.eager_joins: if adapter: context.eager_joins = adapter.traverse(context.eager_joins) statement.append_from(context.eager_joins, _copy_collection=False) if context.eager_order_by: if adapter: context.eager_order_by = adapter.copy_and_process(context.eager_order_by) statement.append_order_by(*context.eager_order_by) context.statement = statement return context def _select_args(self): """Return a dictionary of attributes that can be applied to a ``sql.Select`` statement. """ return {'limit':self._limit, 'offset':self._offset, 'distinct':self._distinct, 'group_by':self._group_by or None, 'having':self._having or None} def _get_entity_clauses(self, m): """for tuples added via add_entity() or add_column(), attempt to locate an AliasedClauses object which should be used to formulate the query as well as to process result rows.""" (m, alias, alias_id) = m if alias is not None: return alias if alias_id is not None: try: return self._alias_ids[alias_id] except KeyError: raise exceptions.InvalidRequestError("Query has no alias identified by '%s'" % alias_id) if isinstance(m, type): m = mapper.class_mapper(m) if isinstance(m, mapper.Mapper): l = self._alias_ids.get(m) if l: if len(l) > 1: raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_entity()" % str(m)) else: return l[0] else: return None elif isinstance(m, sql.ColumnElement): aliases = [] for table in sql_util.find_tables(m, check_columns=True): for a in self._alias_ids.get(table, []): aliases.append(a) if len(aliases) > 1: raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_column()" % str(m)) elif len(aliases) == 1: return aliases[0] else: return None def __log_debug(self, msg): self.logger.debug(msg) def __str__(self): return str(self.compile()) # DEPRECATED LAND ! def list(self): #pragma: no cover """DEPRECATED. use all()""" return list(self) def scalar(self): #pragma: no cover """DEPRECATED. use first()""" return self.first() def _legacy_filter_by(self, *args, **kwargs): #pragma: no cover return self.filter(self._legacy_join_by(args, kwargs, start=self._joinpoint)) def count_by(self, *args, **params): #pragma: no cover """DEPRECATED. use query.filter_by(\**params).count()""" return self.count(self.join_by(*args, **params)) def select_whereclause(self, whereclause=None, params=None, **kwargs): #pragma: no cover """DEPRECATED. use query.filter(whereclause).all()""" q = self.filter(whereclause)._legacy_select_kwargs(**kwargs) if params is not None: q = q.params(params) return list(q) def _legacy_select_from(self, from_obj): q = self._clone() if len(from_obj) > 1: raise exceptions.ArgumentError("Multiple-entry from_obj parameter no longer supported") q._from_obj = from_obj[0] return q def _legacy_select_kwargs(self, **kwargs): #pragma: no cover q = self if "order_by" in kwargs and kwargs['order_by']: q = q.order_by(kwargs['order_by']) if "group_by" in kwargs: q = q.group_by(kwargs['group_by']) if "from_obj" in kwargs: q = q._legacy_select_from(kwargs['from_obj']) if "lockmode" in kwargs: q = q.with_lockmode(kwargs['lockmode']) if "distinct" in kwargs: q = q.distinct() if "limit" in kwargs: q = q.limit(kwargs['limit']) if "offset" in kwargs: q = q.offset(kwargs['offset']) return q def get_by(self, *args, **params): #pragma: no cover """DEPRECATED. use query.filter_by(\**params).first()""" ret = self._extension.get_by(self, *args, **params) if ret is not mapper.EXT_CONTINUE: return ret return self._legacy_filter_by(*args, **params).first() def select_by(self, *args, **params): #pragma: no cover """DEPRECATED. use use query.filter_by(\**params).all().""" ret = self._extension.select_by(self, *args, **params) if ret is not mapper.EXT_CONTINUE: return ret return self._legacy_filter_by(*args, **params).list() def join_by(self, *args, **params): #pragma: no cover """DEPRECATED. use join() to construct joins based on attribute names.""" return self._legacy_join_by(args, params, start=self._joinpoint) def _build_select(self, arg=None, params=None, **kwargs): #pragma: no cover if isinstance(arg, sql.FromClause) and arg.supports_execution(): return self.from_statement(arg) else: return self.filter(arg)._legacy_select_kwargs(**kwargs) def selectfirst(self, arg=None, **kwargs): #pragma: no cover """DEPRECATED. use query.filter(whereclause).first()""" return self._build_select(arg, **kwargs).first() def selectone(self, arg=None, **kwargs): #pragma: no cover """DEPRECATED. use query.filter(whereclause).one()""" return self._build_select(arg, **kwargs).one() def select(self, arg=None, **kwargs): #pragma: no cover """DEPRECATED. use query.filter(whereclause).all(), or query.from_statement(statement).all()""" ret = self._extension.select(self, arg=arg, **kwargs) if ret is not mapper.EXT_CONTINUE: return ret return self._build_select(arg, **kwargs).all() def execute(self, clauseelement, params=None, *args, **kwargs): #pragma: no cover """DEPRECATED. use query.from_statement().all()""" return self._select_statement(clauseelement, params, **kwargs) def select_statement(self, statement, **params): #pragma: no cover """DEPRECATED. Use query.from_statement(statement)""" return self._select_statement(statement, params) def select_text(self, text, **params): #pragma: no cover """DEPRECATED. Use query.from_statement(statement)""" return self._select_statement(text, params) def _select_statement(self, statement, params=None, **kwargs): #pragma: no cover q = self.from_statement(statement) if params is not None: q = q.params(params) q._select_context_options(**kwargs) return list(q) def _select_context_options(self, populate_existing=None, version_check=None, only_load_props=None, refresh_instance=None): #pragma: no cover if populate_existing: self._populate_existing = populate_existing if version_check: self._version_check = version_check if refresh_instance: self._refresh_instance = refresh_instance if only_load_props: self._only_load_props = util.Set(only_load_props) return self def join_to(self, key): #pragma: no cover """DEPRECATED. use join() to create joins based on property names.""" [keys, p] = self._locate_prop(key) return self.join_via(keys) def join_via(self, keys): #pragma: no cover """DEPRECATED. use join() to create joins based on property names.""" mapper = self._joinpoint clause = None for key in keys: prop = mapper.get_property(key, resolve_synonyms=True) if clause is None: clause = prop.get_join(mapper) else: clause &= prop.get_join(mapper) mapper = prop.mapper return clause def _legacy_join_by(self, args, params, start=None): #pragma: no cover import properties clause = None for arg in args: if clause is None: clause = arg else: clause &= arg for key, value in params.iteritems(): (keys, prop) = self._locate_prop(key, start=start) if isinstance(prop, properties.PropertyLoader): c = prop.compare(operators.eq, value) & self.join_via(keys[:-1]) else: c = prop.compare(operators.eq, value) & self.join_via(keys) if clause is None: clause = c else: clause &= c return clause def _locate_prop(self, key, start=None): #pragma: no cover import properties keys = [] seen = util.Set() def search_for_prop(mapper_): if mapper_ in seen: return None seen.add(mapper_) prop = mapper_.get_property(key, resolve_synonyms=True, raiseerr=False) if prop is not None: if isinstance(prop, properties.PropertyLoader): keys.insert(0, prop.key) return prop else: for prop in mapper_.iterate_properties: if not isinstance(prop, properties.PropertyLoader): continue x = search_for_prop(prop.mapper) if x: keys.insert(0, prop.key) return x else: return None p = search_for_prop(start or self.mapper) if p is None: raise exceptions.InvalidRequestError("Can't locate property named '%s'" % key) return [keys, p] def selectfirst_by(self, *args, **params): #pragma: no cover """DEPRECATED. Use query.filter_by(\**kwargs).first()""" return self._legacy_filter_by(*args, **params).first() def selectone_by(self, *args, **params): #pragma: no cover """DEPRECATED. Use query.filter_by(\**kwargs).one()""" return self._legacy_filter_by(*args, **params).one()for deprecated_method in ('list', 'scalar', 'count_by', 'select_whereclause', 'get_by', 'select_by', 'join_by', 'selectfirst', 'selectone', 'select', 'execute', 'select_statement', 'select_text', 'join_to', 'join_via', 'selectfirst_by', 'selectone_by'): setattr(Query, deprecated_method, util.deprecated(getattr(Query, deprecated_method), add_deprecation_to_docstring=False))Query.logger = logging.class_logger(Query)class QueryContext(object): def __init__(self, query): self.query = query self.mapper = query.mapper self.session = query.session self.extension = query._extension self.statement = None self.row_adapter = None self.populate_existing = query._populate_existing self.version_check = query._version_check self.only_load_props = query._only_load_props self.refresh_instance = query._refresh_instance self.path = () self.primary_columns = [] self.secondary_columns = [] self.eager_order_by = [] self.eager_joins = None self.options = query._with_options self.attributes = query._attributes.copy() def exec_with_path(self, mapper, propkey, func, *args, **kwargs): oldpath = self.path self.path += (mapper.base_mapper, propkey) try: return func(*args, **kwargs) finally: self.path = oldpath_runid = 1L_id_lock = util.threading.Lock()def _new_runid(): global _runid _id_lock.acquire() try: _runid += 1 return _runid finally: _id_lock.release()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -