⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 store.py

📁 Python的一个ORM,现在很火
💻 PY
📖 第 1 页 / 共 3 页
字号:
        result = self._store._connection.execute(select)        values = result.get_one()        if values:            return self._load_objects(result, values)        return None    def first(self):        """Return the first item from an ordered result set.        Will raise UnorderedError if the result set isn't ordered.        See also last(), one(), and any().        """        if self._order_by is Undef:            raise UnorderedError("Can't use first() on unordered result set")        return self.any()    def last(self):        """Return the last item from an ordered result set.        Will raise UnorderedError if the result set isn't ordered.        See also first(), one(), and any().        """        if self._order_by is Undef:            raise UnorderedError("Can't use last() on unordered result set")        if self._limit is not Undef:            raise FeatureError("Can't use last() with a slice "                               "of defined stop index")        select = self._get_select()        select.offset = Undef        select.limit = 1        select.order_by = []        for expr in self._order_by:            if isinstance(expr, Desc):                select.order_by.append(expr.expr)            elif isinstance(expr, Asc):                select.order_by.append(Desc(expr.expr))            else:                select.order_by.append(Desc(expr))        result = self._store._connection.execute(select)        values = result.get_one()        if values:            return self._load_objects(result, values)        return None    def one(self):        """Return one item from a result set containing at most one item.        Will raise NotOneError if the result set contains more than one item.        See also first(), one(), and any().        """        select = self._get_select()        # limit could be 1 due to slicing, for instance.        if select.limit is not Undef and select.limit > 2:            select.limit = 2        result = self._store._connection.execute(select)        values = result.get_one()        if result.get_one():            raise NotOneError("one() used with more than one result available")        if values:            return self._load_objects(result, values)        return None    def order_by(self, *args):        if self._offset is not Undef or self._limit is not Undef:            raise FeatureError("Can't reorder a sliced result set")        self._order_by = args or Undef        return self    def remove(self):        if self._offset is not Undef or self._limit is not Undef:            raise FeatureError("Can't remove a sliced result set")        if type(self._cls_spec_info) is tuple:            raise FeatureError("Removing not yet supported with tuple finds")        if self._select is not Undef:            raise FeatureError("Removing isn't supported with "                               "set expressions (unions, etc)")        self._store._connection.execute(Delete(self._where,                                               self._cls_spec_info.table),                                        noresult=True)    def _aggregate(self, expr, column=None):        if type(self._cls_spec_info) is tuple:            default_tables = [cls_info.table                              for cls_info in self._cls_spec_info]        else:            default_tables = self._cls_spec_info.table        if self._select is Undef:            select = Select(expr, self._where, self._tables, default_tables)        else:            select = Select(expr, tables=Alias(self._select))        result = self._store._connection.execute(select)        value = result.get_one()[0]        if column is None:            return value        else:            variable = column.variable_factory()            result.set_variable(variable, value)            return variable.get()    def count(self, column=Undef, distinct=False):        return int(self._aggregate(Count(column, distinct)))    def max(self, column):        return self._aggregate(Max(column), column)    def min(self, column):        return self._aggregate(Min(column), column)    def avg(self, column):        value = self._aggregate(Avg(column))        if value is None:            return value        return float(value)    def sum(self, column):        return self._aggregate(Sum(column), column)    def values(self, *columns):        if not columns:            raise FeatureError("values() takes at least one column "                               "as argument")        select = self._get_select()        select.columns = columns        result = self._store._connection.execute(select)        if len(columns) == 1:            variable = columns[0].variable_factory()            for values in result:                result.set_variable(variable, values[0])                yield variable.get()        else:            variables = [column.variable_factory() for column in columns]            for values in result:                for variable, value in zip(variables, values):                    result.set_variable(variable, value)                yield tuple(variable.get() for variable in variables)    def set(self, *args, **kwargs):        """Update objects in the result set with the given arguments.        This method will update all objects in the current result set        to match expressions given as equalities or keyword arguments.        These objects may still be in the database (an UPDATE is issued)        or may be cached.        For instance, C{result.set(Class.attr1 == 1, attr2=2)} will set        C{attr1} to 1 and C{attr2} to 2, on all matching objects.        """        if type(self._cls_spec_info) is tuple:            raise FeatureError("Setting isn't supported with tuple finds")        if self._select is not Undef:            raise FeatureError("Setting isn't supported with "                               "set expressions (unions, etc)")        if not (args or kwargs):            return        changes = {}        cls = self._cls_spec_info.cls        # For now only "Class.attr == var" is supported in args.        for expr in args:            if (not isinstance(expr, Eq) or                not isinstance(expr.expr1, Column) or                not isinstance(expr.expr2, (Column, Variable))):                raise FeatureError("Unsupported set expression: %r" %                                   repr(expr))            changes[expr.expr1] = expr.expr2        for key, value in kwargs.items():            column = getattr(cls, key)            if value is None:                changes[column] = None            elif isinstance(value, Expr):                if not isinstance(value, Column):                    raise FeatureError("Unsupported set expression: %r" %                                       repr(value))                changes[column] = value            else:                changes[column] = column.variable_factory(value=value)        expr = Update(changes, self._where, self._cls_spec_info.table)        self._store._connection.execute(expr, noresult=True)        try:            cached = self.cached()        except CompileError:            for obj_info in self._store._iter_cached():                for column in changes:                    obj_info.variables[column].set(AutoReload)        else:            changes = changes.items()            for obj in cached:                for column, value in changes:                    variables = get_obj_info(obj).variables                    if value is None:                        pass                    elif isinstance(value, Variable):                        value = value.get()                    else:                        value = variables[value].get()                    variables[column].set(value)                    variables[column].checkpoint()    def cached(self):        """Return matching objects from the cache for the current query."""        if type(self._cls_spec_info) is tuple:            raise FeatureError("Cached finds not supported with tuples")        if self._tables is not Undef:            raise FeatureError("Cached finds not supported with custom tables")        if self._where is Undef:            match = None        else:            match = compile_python(self._where)            name_to_column = dict((column.name, column)                                  for column in self._cls_spec_info.columns)            def get_column(name, name_to_column=name_to_column):                return obj_info.variables[name_to_column[name]].get()        objects = []        cls = self._cls_spec_info.cls        for obj_info in self._store._iter_cached():            try:                if (obj_info.cls_info is self._cls_spec_info and                    (match is None or match(get_column))):                    obj = obj_info.get_obj()                    if obj is not None:                        objects.append(obj)            except LostObjectError:                pass        return objects    def _set_expr(self, expr_cls, other, all=False):        if self._cls_spec_info != other._cls_spec_info:            raise FeatureError("Incompatible results for set operation")        expr = expr_cls(self._get_select(), other._get_select(), all=all)        return ResultSet(self._store, self._cls_spec_info, select=expr)    def union(self, other, all=False):        if isinstance(other, EmptyResultSet):            return self        return self._set_expr(Union, other, all)    def difference(self, other, all=False):        if isinstance(other, EmptyResultSet):            return self        return self._set_expr(Except, other, all)    def intersection(self, other, all=False):        if isinstance(other, EmptyResultSet):            return other        return self._set_expr(Intersect, other, all)class EmptyResultSet(object):    def __init__(self, ordered=False):        self._order_by = ordered    def _get_select(self):        return Select(SQLRaw("1"), SQLRaw("1 = 2"))    def copy(self):        result = EmptyResultSet(self._order_by)        return result    def config(self, distinct=None, offset=None, limit=None):        pass    def __iter__(self):        return        yield None    def __getitem__(self, index):        return self.copy()    def any(self):        return None    def first(self):        if self._order_by:            return None        raise UnorderedError("Can't use first() on unordered result set")    def last(self):        if self._order_by:            return None        raise UnorderedError("Can't use last() on unordered result set")    def one(self):        return None    def order_by(self, *args):        self._order_by = True        return self    def remove(self):        pass    def count(self, column=Undef, distinct=False):        return 0    def max(self, column):        return None    def min(self, column):        return None    def avg(self, column):        return None    def sum(self, column):        return None    def values(self, *columns):        if not columns:            raise FeatureError("values() takes at least one column "                               "as argument")        return        yield None    def set(self, *args, **kwargs):        pass    def cached(self):        return []    def union(self, other):        if isinstance(other, EmptyResultSet):            return self        return other.union(self)    def difference(self, other):        return self    def intersection(self, other):        return selfclass TableSet(object):        def __init__(self, store, tables):        self._store = store        self._tables = tables    def find(self, cls_spec, *args, **kwargs):        self._store.flush()        if type(cls_spec) is tuple:            cls_spec_info = tuple(get_cls_info(cls) for cls in cls_spec)            where = get_where_for_args(args, kwargs)        else:            cls_spec_info = get_cls_info(cls_spec)            where = get_where_for_args(args, kwargs, cls_spec)        return self._store._result_set_factory(self._store, cls_spec_info,                                               where, self._tables)Store._result_set_factory = ResultSetStore._table_set = TableSetdef get_where_for_args(args, kwargs, cls=None):    equals = list(args)    if kwargs:        if cls is None:            raise FeatureError("Can't determine class that keyword "                               "arguments are associated with")        for key, value in kwargs.items():            equals.append(getattr(cls, key) == value)    if equals:        return And(*equals)    return Undefclass AutoReload(LazyValue):    passAutoReload = AutoReload()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -