⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 query.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 4 页
字号:
                cf = [from_obj.corresponding_column(c) or c for c in cf]            s2 = sql.select(context.primary_columns + list(cf), whereclause, from_obj=context.from_clause, use_labels=True, correlate=False, order_by=util.to_list(order_by), **self._select_args())            s3 = s2.alias()            context.row_adapter = mapperutil.create_row_adapter(s3, self.table)            statement = sql.select([s3] + context.secondary_columns, for_update=for_update, use_labels=True)            if context.eager_joins:                eager_joins = sql_util.ClauseAdapter(s3).traverse(context.eager_joins)                statement.append_from(eager_joins, _copy_collection=False)            if order_by:                statement.append_order_by(*sql_util.ClauseAdapter(s3).copy_and_process(order_by))            statement.append_order_by(*context.eager_order_by)        else:            if adapter:                # TODO: make usage of the ClauseAdapter here to create row adapter, list                # of primary columns                context.primary_columns = [from_obj.corresponding_column(c) or c for c in context.primary_columns]                context.row_adapter = mapperutil.create_row_adapter(from_obj, self.table)            if self._distinct:                if order_by:                    order_by = [expression._literal_as_text(o) for o in util.to_list(order_by) or []]                if self._distinct and order_by:                    cf = util.Set()                    for o in order_by:                        cf.update(sql_util.find_columns(o))                    for c in cf:                        context.primary_columns.append(c)            statement = sql.select(context.primary_columns + context.secondary_columns, whereclause, from_obj=from_obj, use_labels=True, for_update=for_update, order_by=util.to_list(order_by), **self._select_args())            if context.eager_joins:                if adapter:                    context.eager_joins = adapter.traverse(context.eager_joins)                statement.append_from(context.eager_joins, _copy_collection=False)            if context.eager_order_by:                if adapter:                    context.eager_order_by = adapter.copy_and_process(context.eager_order_by)                statement.append_order_by(*context.eager_order_by)        context.statement = statement        return context    def _select_args(self):        """Return a dictionary of attributes that can be applied to a ``sql.Select`` statement.        """        return {'limit':self._limit, 'offset':self._offset, 'distinct':self._distinct, 'group_by':self._group_by or None, 'having':self._having or None}    def _get_entity_clauses(self, m):        """for tuples added via add_entity() or add_column(), attempt to locate        an AliasedClauses object which should be used to formulate the query as well        as to process result rows."""        (m, alias, alias_id) = m        if alias is not None:            return alias        if alias_id is not None:            try:                return self._alias_ids[alias_id]            except KeyError:                raise exceptions.InvalidRequestError("Query has no alias identified by '%s'" % alias_id)        if isinstance(m, type):            m = mapper.class_mapper(m)        if isinstance(m, mapper.Mapper):            l = self._alias_ids.get(m)            if l:                if len(l) > 1:                    raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_entity()" % str(m))                else:                    return l[0]            else:                return None        elif isinstance(m, sql.ColumnElement):            aliases = []            for table in sql_util.find_tables(m, check_columns=True):                for a in self._alias_ids.get(table, []):                    aliases.append(a)            if len(aliases) > 1:                raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_column()" % str(m))            elif len(aliases) == 1:                return aliases[0]            else:                return None    def __log_debug(self, msg):        self.logger.debug(msg)    def __str__(self):        return str(self.compile())    # DEPRECATED LAND !    def list(self): #pragma: no cover        """DEPRECATED.  use all()"""        return list(self)    def scalar(self): #pragma: no cover        """DEPRECATED.  use first()"""        return self.first()    def _legacy_filter_by(self, *args, **kwargs): #pragma: no cover        return self.filter(self._legacy_join_by(args, kwargs, start=self._joinpoint))    def count_by(self, *args, **params): #pragma: no cover        """DEPRECATED.  use query.filter_by(\**params).count()"""        return self.count(self.join_by(*args, **params))    def select_whereclause(self, whereclause=None, params=None, **kwargs): #pragma: no cover        """DEPRECATED.  use query.filter(whereclause).all()"""        q = self.filter(whereclause)._legacy_select_kwargs(**kwargs)        if params is not None:            q = q.params(params)        return list(q)    def _legacy_select_from(self, from_obj):        q = self._clone()        if len(from_obj) > 1:            raise exceptions.ArgumentError("Multiple-entry from_obj parameter no longer supported")        q._from_obj = from_obj[0]        return q    def _legacy_select_kwargs(self, **kwargs): #pragma: no cover        q = self        if "order_by" in kwargs and kwargs['order_by']:            q = q.order_by(kwargs['order_by'])        if "group_by" in kwargs:            q = q.group_by(kwargs['group_by'])        if "from_obj" in kwargs:            q = q._legacy_select_from(kwargs['from_obj'])        if "lockmode" in kwargs:            q = q.with_lockmode(kwargs['lockmode'])        if "distinct" in kwargs:            q = q.distinct()        if "limit" in kwargs:            q = q.limit(kwargs['limit'])        if "offset" in kwargs:            q = q.offset(kwargs['offset'])        return q    def get_by(self, *args, **params): #pragma: no cover        """DEPRECATED.  use query.filter_by(\**params).first()"""        ret = self._extension.get_by(self, *args, **params)        if ret is not mapper.EXT_CONTINUE:            return ret        return self._legacy_filter_by(*args, **params).first()    def select_by(self, *args, **params): #pragma: no cover        """DEPRECATED. use use query.filter_by(\**params).all()."""        ret = self._extension.select_by(self, *args, **params)        if ret is not mapper.EXT_CONTINUE:            return ret        return self._legacy_filter_by(*args, **params).list()    def join_by(self, *args, **params): #pragma: no cover        """DEPRECATED. use join() to construct joins based on attribute names."""        return self._legacy_join_by(args, params, start=self._joinpoint)    def _build_select(self, arg=None, params=None, **kwargs): #pragma: no cover        if isinstance(arg, sql.FromClause) and arg.supports_execution():            return self.from_statement(arg)        else:            return self.filter(arg)._legacy_select_kwargs(**kwargs)    def selectfirst(self, arg=None, **kwargs): #pragma: no cover        """DEPRECATED.  use query.filter(whereclause).first()"""        return self._build_select(arg, **kwargs).first()    def selectone(self, arg=None, **kwargs): #pragma: no cover        """DEPRECATED.  use query.filter(whereclause).one()"""        return self._build_select(arg, **kwargs).one()    def select(self, arg=None, **kwargs): #pragma: no cover        """DEPRECATED.  use query.filter(whereclause).all(), or query.from_statement(statement).all()"""        ret = self._extension.select(self, arg=arg, **kwargs)        if ret is not mapper.EXT_CONTINUE:            return ret        return self._build_select(arg, **kwargs).all()    def execute(self, clauseelement, params=None, *args, **kwargs): #pragma: no cover        """DEPRECATED.  use query.from_statement().all()"""        return self._select_statement(clauseelement, params, **kwargs)    def select_statement(self, statement, **params): #pragma: no cover        """DEPRECATED.  Use query.from_statement(statement)"""        return self._select_statement(statement, params)    def select_text(self, text, **params): #pragma: no cover        """DEPRECATED.  Use query.from_statement(statement)"""        return self._select_statement(text, params)    def _select_statement(self, statement, params=None, **kwargs): #pragma: no cover        q = self.from_statement(statement)        if params is not None:            q = q.params(params)        q._select_context_options(**kwargs)        return list(q)    def _select_context_options(self, populate_existing=None, version_check=None, only_load_props=None, refresh_instance=None): #pragma: no cover        if populate_existing:            self._populate_existing = populate_existing        if version_check:            self._version_check = version_check        if refresh_instance:            self._refresh_instance = refresh_instance        if only_load_props:            self._only_load_props = util.Set(only_load_props)        return self    def join_to(self, key): #pragma: no cover        """DEPRECATED. use join() to create joins based on property names."""        [keys, p] = self._locate_prop(key)        return self.join_via(keys)    def join_via(self, keys): #pragma: no cover        """DEPRECATED. use join() to create joins based on property names."""        mapper = self._joinpoint        clause = None        for key in keys:            prop = mapper.get_property(key, resolve_synonyms=True)            if clause is None:                clause = prop.get_join(mapper)            else:                clause &= prop.get_join(mapper)            mapper = prop.mapper        return clause    def _legacy_join_by(self, args, params, start=None): #pragma: no cover        import properties        clause = None        for arg in args:            if clause is None:                clause = arg            else:                clause &= arg        for key, value in params.iteritems():            (keys, prop) = self._locate_prop(key, start=start)            if isinstance(prop, properties.PropertyLoader):                c = prop.compare(operators.eq, value) & self.join_via(keys[:-1])            else:                c = prop.compare(operators.eq, value) & self.join_via(keys)            if clause is None:                clause =  c            else:                clause &= c        return clause    def _locate_prop(self, key, start=None): #pragma: no cover        import properties        keys = []        seen = util.Set()        def search_for_prop(mapper_):            if mapper_ in seen:                return None            seen.add(mapper_)            prop = mapper_.get_property(key, resolve_synonyms=True, raiseerr=False)            if prop is not None:                if isinstance(prop, properties.PropertyLoader):                    keys.insert(0, prop.key)                return prop            else:                for prop in mapper_.iterate_properties:                    if not isinstance(prop, properties.PropertyLoader):                        continue                    x = search_for_prop(prop.mapper)                    if x:                        keys.insert(0, prop.key)                        return x                else:                    return None        p = search_for_prop(start or self.mapper)        if p is None:            raise exceptions.InvalidRequestError("Can't locate property named '%s'" % key)        return [keys, p]    def selectfirst_by(self, *args, **params): #pragma: no cover        """DEPRECATED. Use query.filter_by(\**kwargs).first()"""        return self._legacy_filter_by(*args, **params).first()    def selectone_by(self, *args, **params): #pragma: no cover        """DEPRECATED. Use query.filter_by(\**kwargs).one()"""        return self._legacy_filter_by(*args, **params).one()for deprecated_method in ('list', 'scalar', 'count_by',                          'select_whereclause', 'get_by', 'select_by',                          'join_by', 'selectfirst', 'selectone', 'select',                          'execute', 'select_statement', 'select_text',                          'join_to', 'join_via', 'selectfirst_by',                          'selectone_by'):    setattr(Query, deprecated_method,            util.deprecated(getattr(Query, deprecated_method),                            add_deprecation_to_docstring=False))Query.logger = logging.class_logger(Query)class QueryContext(object):    def __init__(self, query):        self.query = query        self.mapper = query.mapper        self.session = query.session        self.extension = query._extension        self.statement = None        self.row_adapter = None        self.populate_existing = query._populate_existing        self.version_check = query._version_check        self.only_load_props = query._only_load_props        self.refresh_instance = query._refresh_instance        self.path = ()        self.primary_columns = []        self.secondary_columns = []        self.eager_order_by = []        self.eager_joins = None        self.options = query._with_options        self.attributes = query._attributes.copy()    def exec_with_path(self, mapper, propkey, func, *args, **kwargs):        oldpath = self.path        self.path += (mapper.base_mapper, propkey)        try:            return func(*args, **kwargs)        finally:            self.path = oldpath_runid = 1L_id_lock = util.threading.Lock()def _new_runid():    global _runid    _id_lock.acquire()    try:        _runid += 1        return _runid    finally:        _id_lock.release()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -