📄 store.py
字号:
result = self._store._connection.execute(select) values = result.get_one() if values: return self._load_objects(result, values) return None def first(self): """Return the first item from an ordered result set. Will raise UnorderedError if the result set isn't ordered. See also last(), one(), and any(). """ if self._order_by is Undef: raise UnorderedError("Can't use first() on unordered result set") return self.any() def last(self): """Return the last item from an ordered result set. Will raise UnorderedError if the result set isn't ordered. See also first(), one(), and any(). """ if self._order_by is Undef: raise UnorderedError("Can't use last() on unordered result set") if self._limit is not Undef: raise FeatureError("Can't use last() with a slice " "of defined stop index") select = self._get_select() select.offset = Undef select.limit = 1 select.order_by = [] for expr in self._order_by: if isinstance(expr, Desc): select.order_by.append(expr.expr) elif isinstance(expr, Asc): select.order_by.append(Desc(expr.expr)) else: select.order_by.append(Desc(expr)) result = self._store._connection.execute(select) values = result.get_one() if values: return self._load_objects(result, values) return None def one(self): """Return one item from a result set containing at most one item. Will raise NotOneError if the result set contains more than one item. See also first(), one(), and any(). """ select = self._get_select() # limit could be 1 due to slicing, for instance. if select.limit is not Undef and select.limit > 2: select.limit = 2 result = self._store._connection.execute(select) values = result.get_one() if result.get_one(): raise NotOneError("one() used with more than one result available") if values: return self._load_objects(result, values) return None def order_by(self, *args): if self._offset is not Undef or self._limit is not Undef: raise FeatureError("Can't reorder a sliced result set") self._order_by = args or Undef return self def remove(self): if self._offset is not Undef or self._limit is not Undef: raise FeatureError("Can't remove a sliced result set") if type(self._cls_spec_info) is tuple: raise FeatureError("Removing not yet supported with tuple finds") if self._select is not Undef: raise FeatureError("Removing isn't supported with " "set expressions (unions, etc)") self._store._connection.execute(Delete(self._where, self._cls_spec_info.table), noresult=True) def _aggregate(self, expr, column=None): if type(self._cls_spec_info) is tuple: default_tables = [cls_info.table for cls_info in self._cls_spec_info] else: default_tables = self._cls_spec_info.table if self._select is Undef: select = Select(expr, self._where, self._tables, default_tables) else: select = Select(expr, tables=Alias(self._select)) result = self._store._connection.execute(select) value = result.get_one()[0] if column is None: return value else: variable = column.variable_factory() result.set_variable(variable, value) return variable.get() def count(self, column=Undef, distinct=False): return int(self._aggregate(Count(column, distinct))) def max(self, column): return self._aggregate(Max(column), column) def min(self, column): return self._aggregate(Min(column), column) def avg(self, column): value = self._aggregate(Avg(column)) if value is None: return value return float(value) def sum(self, column): return self._aggregate(Sum(column), column) def values(self, *columns): if not columns: raise FeatureError("values() takes at least one column " "as argument") select = self._get_select() select.columns = columns result = self._store._connection.execute(select) if len(columns) == 1: variable = columns[0].variable_factory() for values in result: result.set_variable(variable, values[0]) yield variable.get() else: variables = [column.variable_factory() for column in columns] for values in result: for variable, value in zip(variables, values): result.set_variable(variable, value) yield tuple(variable.get() for variable in variables) def set(self, *args, **kwargs): """Update objects in the result set with the given arguments. This method will update all objects in the current result set to match expressions given as equalities or keyword arguments. These objects may still be in the database (an UPDATE is issued) or may be cached. For instance, C{result.set(Class.attr1 == 1, attr2=2)} will set C{attr1} to 1 and C{attr2} to 2, on all matching objects. """ if type(self._cls_spec_info) is tuple: raise FeatureError("Setting isn't supported with tuple finds") if self._select is not Undef: raise FeatureError("Setting isn't supported with " "set expressions (unions, etc)") if not (args or kwargs): return changes = {} cls = self._cls_spec_info.cls # For now only "Class.attr == var" is supported in args. for expr in args: if (not isinstance(expr, Eq) or not isinstance(expr.expr1, Column) or not isinstance(expr.expr2, (Column, Variable))): raise FeatureError("Unsupported set expression: %r" % repr(expr)) changes[expr.expr1] = expr.expr2 for key, value in kwargs.items(): column = getattr(cls, key) if value is None: changes[column] = None elif isinstance(value, Expr): if not isinstance(value, Column): raise FeatureError("Unsupported set expression: %r" % repr(value)) changes[column] = value else: changes[column] = column.variable_factory(value=value) expr = Update(changes, self._where, self._cls_spec_info.table) self._store._connection.execute(expr, noresult=True) try: cached = self.cached() except CompileError: for obj_info in self._store._iter_cached(): for column in changes: obj_info.variables[column].set(AutoReload) else: changes = changes.items() for obj in cached: for column, value in changes: variables = get_obj_info(obj).variables if value is None: pass elif isinstance(value, Variable): value = value.get() else: value = variables[value].get() variables[column].set(value) variables[column].checkpoint() def cached(self): """Return matching objects from the cache for the current query.""" if type(self._cls_spec_info) is tuple: raise FeatureError("Cached finds not supported with tuples") if self._tables is not Undef: raise FeatureError("Cached finds not supported with custom tables") if self._where is Undef: match = None else: match = compile_python(self._where) name_to_column = dict((column.name, column) for column in self._cls_spec_info.columns) def get_column(name, name_to_column=name_to_column): return obj_info.variables[name_to_column[name]].get() objects = [] cls = self._cls_spec_info.cls for obj_info in self._store._iter_cached(): try: if (obj_info.cls_info is self._cls_spec_info and (match is None or match(get_column))): obj = obj_info.get_obj() if obj is not None: objects.append(obj) except LostObjectError: pass return objects def _set_expr(self, expr_cls, other, all=False): if self._cls_spec_info != other._cls_spec_info: raise FeatureError("Incompatible results for set operation") expr = expr_cls(self._get_select(), other._get_select(), all=all) return ResultSet(self._store, self._cls_spec_info, select=expr) def union(self, other, all=False): if isinstance(other, EmptyResultSet): return self return self._set_expr(Union, other, all) def difference(self, other, all=False): if isinstance(other, EmptyResultSet): return self return self._set_expr(Except, other, all) def intersection(self, other, all=False): if isinstance(other, EmptyResultSet): return other return self._set_expr(Intersect, other, all)class EmptyResultSet(object): def __init__(self, ordered=False): self._order_by = ordered def _get_select(self): return Select(SQLRaw("1"), SQLRaw("1 = 2")) def copy(self): result = EmptyResultSet(self._order_by) return result def config(self, distinct=None, offset=None, limit=None): pass def __iter__(self): return yield None def __getitem__(self, index): return self.copy() def any(self): return None def first(self): if self._order_by: return None raise UnorderedError("Can't use first() on unordered result set") def last(self): if self._order_by: return None raise UnorderedError("Can't use last() on unordered result set") def one(self): return None def order_by(self, *args): self._order_by = True return self def remove(self): pass def count(self, column=Undef, distinct=False): return 0 def max(self, column): return None def min(self, column): return None def avg(self, column): return None def sum(self, column): return None def values(self, *columns): if not columns: raise FeatureError("values() takes at least one column " "as argument") return yield None def set(self, *args, **kwargs): pass def cached(self): return [] def union(self, other): if isinstance(other, EmptyResultSet): return self return other.union(self) def difference(self, other): return self def intersection(self, other): return selfclass TableSet(object): def __init__(self, store, tables): self._store = store self._tables = tables def find(self, cls_spec, *args, **kwargs): self._store.flush() if type(cls_spec) is tuple: cls_spec_info = tuple(get_cls_info(cls) for cls in cls_spec) where = get_where_for_args(args, kwargs) else: cls_spec_info = get_cls_info(cls_spec) where = get_where_for_args(args, kwargs, cls_spec) return self._store._result_set_factory(self._store, cls_spec_info, where, self._tables)Store._result_set_factory = ResultSetStore._table_set = TableSetdef get_where_for_args(args, kwargs, cls=None): equals = list(args) if kwargs: if cls is None: raise FeatureError("Can't determine class that keyword " "arguments are associated with") for key, value in kwargs.items(): equals.append(getattr(cls, key) == value) if equals: return And(*equals) return Undefclass AutoReload(LazyValue): passAutoReload = AutoReload()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -