📄 variables.py
字号:
## Copyright (c) 2006, 2007 Canonical## Written by Gustavo Niemeyer <gustavo@niemeyer.net>## This file is part of Storm Object Relational Mapper.## Storm is free software; you can redistribute it and/or modify# it under the terms of the GNU Lesser General Public License as# published by the Free Software Foundation; either version 2.1 of# the License, or (at your option) any later version.## Storm is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public License# along with this program. If not, see <http://www.gnu.org/licenses/>.#from datetime import datetime, date, time, timedeltafrom decimal import Decimalimport cPickle as picklefrom storm.exceptions import NoneErrorfrom storm import Undef__all__ = [ "VariableFactory", "Variable", "LazyValue", "BoolVariable", "IntVariable", "FloatVariable", "CharsVariable", "UnicodeVariable", "DateTimeVariable", "DateVariable", "TimeVariable", "TimeDeltaVariable", "EnumVariable", "PickleVariable", "ListVariable",]def VariableFactory(cls, **old_kwargs): """Build cls with kwargs of constructor updated by kwargs of call. This is really an implementation of partial/curry functions, and should be replaced by 'partial' once 2.5 is in use. """ def variable_factory(**new_kwargs): kwargs = old_kwargs.copy() kwargs.update(new_kwargs) return cls(**kwargs) return variable_factoryclass Variable(object): _value = Undef _lazy_value = Undef _saved_state = Undef _checkpoint_state = Undef _allow_none = True column = None event = None def __init__(self, value=Undef, value_factory=Undef, from_db=False, allow_none=True, column=None, event=None): if not allow_none: self._allow_none = False if value is not Undef: self.set(value, from_db) elif value_factory is not Undef: self.set(value_factory(), from_db) self.column = column self.event = event @staticmethod def _parse_get(value, to_db): return value @staticmethod def _parse_set(value, from_db): return value def get_lazy(self, default=None): if self._lazy_value is Undef: return default return self._lazy_value def get(self, default=None, to_db=False): if self._lazy_value is not Undef and self.event is not None: self.event.emit("resolve-lazy-value", self, self._lazy_value) value = self._value if value is Undef: return default if value is None: return value return self._parse_get(value, to_db) def set(self, value, from_db=False): if isinstance(value, LazyValue): self._lazy_value = value new_value = Undef else: if self._lazy_value is not Undef: del self._lazy_value if value is None: if self._allow_none is False: raise self._get_none_error() new_value = None else: new_value = self._parse_set(value, from_db) if from_db: # Prepare it for being used by the hook below. value = self._parse_get(new_value, False) old_value = self._value self._value = new_value if (self.event is not None and (self._lazy_value is not Undef or new_value != old_value)): if old_value is not None and old_value is not Undef: old_value = self._parse_get(old_value, False) self.event.emit("changed", self, old_value, value, from_db) def delete(self): old_value = self._value if old_value != Undef: self._value = Undef if self.event: if old_value is not None and old_value is not Undef: old_value = self._parse_get(old_value, False) self.event.emit("changed", self, old_value, Undef, False) def is_defined(self): return self._value is not Undef def has_changed(self): return (self._lazy_value is not Undef or self.get_state() != self._checkpoint_state) def get_state(self): return (self._lazy_value, self._value) def set_state(self, state): self._lazy_value, self._value = state def checkpoint(self): self._checkpoint_state = self.get_state() def copy(self): variable = object.__new__(self.__class__) variable.set_state(self.get_state()) return variable def __eq__(self, other): return (self.__class__ is other.__class__ and self._value == other._value) def __hash__(self): return hash(self._value) def _get_none_error(self): if not self.column: return NoneError("None isn't acceptable as a value") else: from storm.expr import compile, CompileError column = self.column.name if self.column.table is not Undef: try: table, parameters = compile(self.column.table) column = "%s.%s" % (table, column) except CompileError: pass return NoneError("None isn't acceptable as a value for %s" % column)class LazyValue(object): """Marker to be used as a base class on lazily evaluated values."""class BoolVariable(Variable): @staticmethod def _parse_set(value, from_db): if not isinstance(value, (int, long, float, Decimal)): raise TypeError("Expected bool, found %r: %r" % (type(value), value)) return bool(value)class IntVariable(Variable): @staticmethod def _parse_set(value, from_db): if not isinstance(value, (int, long, float, Decimal)): raise TypeError("Expected int, found %r: %r" % (type(value), value)) return int(value)class FloatVariable(Variable): @staticmethod def _parse_set(value, from_db): if not isinstance(value, (int, long, float, Decimal)): raise TypeError("Expected float, found %r: %r" % (type(value), value)) return float(value)class CharsVariable(Variable): @staticmethod def _parse_set(value, from_db): if isinstance(value, buffer): value = str(value) elif not isinstance(value, str): raise TypeError("Expected str, found %r: %r" % (type(value), value)) return valueclass UnicodeVariable(Variable): @staticmethod def _parse_set(value, from_db): if not isinstance(value, unicode): raise TypeError("Expected unicode, found %r: %r" % (type(value), value)) return valueclass DateTimeVariable(Variable): def __init__(self, *args, **kwargs): self._tzinfo = kwargs.pop("tzinfo", None) super(DateTimeVariable, self).__init__(*args, **kwargs) def _parse_set(self, value, db): if db: if isinstance(value, datetime): pass elif isinstance(value, (str, unicode)): if " " not in value: raise ValueError("Unknown date/time format: %r" % value) date_str, time_str = value.split(" ") value = datetime(*(_parse_date(date_str) + _parse_time(time_str))) else: raise TypeError("Expected datetime, found %s" % repr(value)) if self._tzinfo is not None: if value.tzinfo is None: value = value.replace(tzinfo=self._tzinfo) else: value = value.astimezone(self._tzinfo) else:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -