⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 query.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 4 页
字号:
                elif start is not None and stop is None:                    res._offset = (self._offset or 0) + start                if item.step is not None:                    return list(res)[None:None:item.step]                else:                    return res        else:            return list(self[item:item+1])[0]    def limit(self, limit):        """Apply a ``LIMIT`` to the query and return the newly resulting        ``Query``.        """        return self[:limit]    def offset(self, offset):        """Apply an ``OFFSET`` to the query and return the newly resulting        ``Query``.        """        return self[offset:]    def distinct(self):        """Apply a ``DISTINCT`` to the query and return the newly resulting        ``Query``.        """        new = self._no_statement("distinct")        new._distinct = True        return new    def all(self):        """Return the results represented by this ``Query`` as a list.        This results in an execution of the underlying query.        """        return list(self)    def from_statement(self, statement):        """Execute the given SELECT statement and return results.        This method bypasses all internal statement compilation, and the        statement is executed without modification.        The statement argument is either a string, a ``select()`` construct,        or a ``text()`` construct, and should return the set of columns        appropriate to the entity class represented by this ``Query``.        Also see the ``instances()`` method.        """        if isinstance(statement, basestring):            statement = sql.text(statement)        q = self._no_criterion('from_statement')        q._statement = statement        return q    def first(self):        """Return the first result of this ``Query`` or None if the result doesn't contain any row.        This results in an execution of the underlying query.        """        if self._column_aggregate is not None:            return self._col_aggregate(*self._column_aggregate)        ret = list(self[0:1])        if len(ret) > 0:            return ret[0]        else:            return None    def one(self):        """Return the first result of this ``Query``, raising an exception if more than one row exists.        This results in an execution of the underlying query.        """        if self._column_aggregate is not None:            return self._col_aggregate(*self._column_aggregate)        ret = list(self[0:2])        if len(ret) == 1:            return ret[0]        elif len(ret) == 0:            raise exceptions.InvalidRequestError('No rows returned for one()')        else:            raise exceptions.InvalidRequestError('Multiple rows returned for one()')    def __iter__(self):        context = self._compile_context()        context.statement.use_labels = True        if self._autoflush and not self._populate_existing:            self.session._autoflush()        return self._execute_and_instances(context)    def _execute_and_instances(self, querycontext):        result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance)        return self.iterate_instances(result, querycontext=querycontext)    def instances(self, cursor, *mappers_or_columns, **kwargs):        return list(self.iterate_instances(cursor, *mappers_or_columns, **kwargs))    def iterate_instances(self, cursor, *mappers_or_columns, **kwargs):        session = self.session        context = kwargs.pop('querycontext', None)        if context is None:            context = QueryContext(self)        context.runid = _new_runid()        mappers_or_columns = tuple(self._entities) + mappers_or_columns        tuples = bool(mappers_or_columns)        if context.row_adapter:            def main(context, row):                return self.select_mapper._instance(context, context.row_adapter(row), None,                    extension=context.extension, only_load_props=context.only_load_props, refresh_instance=context.refresh_instance                )        else:            def main(context, row):                return self.select_mapper._instance(context, row, None,                    extension=context.extension, only_load_props=context.only_load_props, refresh_instance=context.refresh_instance                )        if tuples:            process = []            process.append(main)            for tup in mappers_or_columns:                if isinstance(tup, tuple):                    (m, alias, alias_id) = tup                    clauses = self._get_entity_clauses(tup)                else:                    clauses = alias = alias_id = None                    m = tup                if isinstance(m, type):                    m = mapper.class_mapper(m)                if isinstance(m, mapper.Mapper):                    def x(m):                        row_adapter = clauses is not None and clauses.row_decorator or (lambda row: row)                        def proc(context, row):                            return m._instance(context, row_adapter(row), None)                        process.append(proc)                    x(m)                elif isinstance(m, (sql.ColumnElement, basestring)):                    def y(m):                        row_adapter = clauses is not None and clauses.row_decorator or (lambda row: row)                        def proc(context, row):                            return row_adapter(row)[m]                        process.append(proc)                    y(m)                else:                    raise exceptions.InvalidRequestError("Invalid column expression '%r'" % m)        while True:            context.progress = util.Set()            context.partials = {}            if self._yield_per:                fetch = cursor.fetchmany(self._yield_per)                if not fetch:                    return            else:                fetch = cursor.fetchall()            if tuples:                rows = util.OrderedSet()                for row in fetch:                    rows.add(tuple([proc(context, row) for proc in process]))            else:                rows = util.UniqueAppender([])                for row in fetch:                    rows.append(main(context, row))            if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress:                context.refresh_instance.commit(context.only_load_props)                context.progress.remove(context.refresh_instance)            for ii in context.progress:                context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii)                ii.commit_all()                            for ii, attrs in context.partials.items():                context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii, only_load_props=attrs)                ii.commit(attrs)                            for row in rows:                yield row            if not self._yield_per:                break    def _get(self, key=None, ident=None, refresh_instance=None, lockmode=None, only_load_props=None):        lockmode = lockmode or self._lockmode        if not self._populate_existing and not refresh_instance and not self.mapper.always_refresh and lockmode is None:            try:                return self.session.identity_map[key]            except KeyError:                pass        if ident is None:            if key is not None:                ident = key[1]        else:            ident = util.to_list(ident)        q = self        if ident is not None:            q = q._no_criterion('get')            params = {}            (_get_clause, _get_params) = self.select_mapper._get_clause            q = q.filter(_get_clause)            for i, primary_key in enumerate(self.primary_key_columns):                try:                    params[_get_params[primary_key].key] = ident[i]                except IndexError:                    raise exceptions.InvalidRequestError("Could not find enough values to formulate primary key for query.get(); primary key columns are %s" % ', '.join(["'%s'" % str(c) for c in self.primary_key_columns]))            q = q.params(params)        if lockmode is not None:            q = q.with_lockmode(lockmode)        q = q._select_context_options(populate_existing=bool(refresh_instance), version_check=(lockmode is not None), only_load_props=only_load_props, refresh_instance=refresh_instance)        q._order_by = None        try:            # call using all() to avoid LIMIT compilation complexity            return q.all()[0]        except IndexError:            return None    def _nestable(self, **kwargs):        """Return true if the given statement options imply it should be nested."""        return (kwargs.get('limit') is not None or kwargs.get('offset') is not None or kwargs.get('distinct', False))    def count(self, whereclause=None, params=None, **kwargs):        """Apply this query's criterion to a SELECT COUNT statement.        the whereclause, params and \**kwargs arguments are deprecated.  use filter()        and other generative methods to establish modifiers.        """        q = self        if whereclause is not None:            q = q.filter(whereclause)        if params is not None:            q = q.params(params)        q = q._legacy_select_kwargs(**kwargs)        return q._count()    def _count(self):        """Apply this query's criterion to a SELECT COUNT statement.        this is the purely generative version which will become        the public method in version 0.5.        """        whereclause = self._criterion        context = QueryContext(self)        from_obj = self._from_obj        if self._nestable(**self._select_args()):            s = sql.select([self.table], whereclause, from_obj=from_obj, **self._select_args()).alias('getcount').count()        else:            primary_key = self.primary_key_columns            s = sql.select([sql.func.count(list(primary_key)[0])], whereclause, from_obj=from_obj, **self._select_args())        if self._autoflush and not self._populate_existing:            self.session._autoflush()        return self.session.scalar(s, params=self._params, mapper=self.mapper)    def compile(self):        """compiles and returns a SQL statement based on the criterion and conditions within this Query."""        return self._compile_context().statement    def _compile_context(self):        context = QueryContext(self)        if self._statement:            self._statement.use_labels = True            context.statement = self._statement            return context        whereclause = self._criterion        from_obj = self._from_obj        # if the query's ClauseAdapter is present, and its        # specifically adapting against a modified "select_from"        # argument, apply adaptation to the        # individually selected columns as well as "eager" clauses added;        # otherwise its currently not needed        if self._adapter and self.table not in self._get_joinable_tables():            adapter = self._adapter        else:            adapter = None        adapter = self._adapter        # TODO: mappers added via add_entity(), adapt their queries also,        # if those mappers are polymorphic        order_by = self._order_by        if order_by is False:            order_by = self.select_mapper.order_by        if order_by is False:            order_by = []            if self.table.default_order_by() is not None:                order_by = self.table.default_order_by()            if from_obj.default_order_by() is not None:                order_by = from_obj.default_order_by()        try:            for_update = {'read':'read','update':True,'update_nowait':'nowait',None:False}[self._lockmode]        except KeyError:            raise exceptions.ArgumentError("Unknown lockmode '%s'" % self._lockmode)        # if single-table inheritance mapper, add "typecol IN (polymorphic)" criterion so        # that we only load the appropriate types        if self.select_mapper.single and self.select_mapper.polymorphic_on is not None and self.select_mapper.polymorphic_identity is not None:            whereclause = sql.and_(whereclause, self.select_mapper.polymorphic_on.in_([m.polymorphic_identity for m in self.select_mapper.polymorphic_iterator()]))        context.from_clause = from_obj        # give all the attached properties a chance to modify the query        # TODO: doing this off the select_mapper.  if its the polymorphic mapper, then        # it has no relations() on it.  should we compile those too into the query ?  (i.e. eagerloads)        for value in self.select_mapper.iterate_properties:            if self._only_load_props and value.key not in self._only_load_props:                continue            context.exec_with_path(self.select_mapper, value.key, value.setup, context, only_load_props=self._only_load_props)        # additional entities/columns, add those to selection criterion        for tup in self._entities:            (m, alias, alias_id) = tup            clauses = self._get_entity_clauses(tup)            if isinstance(m, mapper.Mapper):                for value in m.iterate_properties:                    context.exec_with_path(m, value.key, value.setup, context, parentclauses=clauses)            elif isinstance(m, sql.ColumnElement):                if clauses is not None:                    m = clauses.aliased_column(m)                context.secondary_columns.append(m)        if self._eager_loaders and self._nestable(**self._select_args()):            # eager loaders are present, and the SELECT has limiting criterion            # produce a "wrapped" selectable.            # ensure all 'order by' elements are ClauseElement instances            # (since they will potentially be aliased)            # locate all embedded Column clauses so they can be added to the            # "inner" select statement where they'll be available to the enclosing            # statement's "order by"            cf = util.Set()            if order_by:                order_by = [expression._literal_as_text(o) for o in util.to_list(order_by) or []]                for o in order_by:                    cf.update(sql_util.find_columns(o))            if adapter:                # TODO: make usage of the ClauseAdapter here to create the list                # of primary columns                context.primary_columns = [from_obj.corresponding_column(c) or c for c in context.primary_columns]

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -