⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 variables.py

📁 Python的一个ORM,现在很火
💻 PY
📖 第 1 页 / 共 2 页
字号:
            if type(value) in (int, long, float):                value = datetime.utcfromtimestamp(value)            elif not isinstance(value, datetime):                raise TypeError("Expected datetime, found %s" % repr(value))            if self._tzinfo is not None:                value = value.astimezone(self._tzinfo)        return valueclass DateVariable(Variable):    @staticmethod    def _parse_set(value, db):        if db:            if value is None:                return None            if isinstance(value, date):                return value            if not isinstance(value, (str, unicode)):                raise TypeError("Expected date, found %s" % repr(value))            if " " in value:                value, time_str = value.split(" ")            return date(*_parse_date(value))        else:            if isinstance(value, datetime):                return value.date()            if not isinstance(value, date):                raise TypeError("Expected date, found %s" % repr(value))            return valueclass TimeVariable(Variable):    @staticmethod    def _parse_set(value, db):        if db:            # XXX Can None ever get here, considering that set() checks for it?            if value is None:                return None            if isinstance(value, time):                return value            if not isinstance(value, (str, unicode)):                raise TypeError("Expected time, found %s" % repr(value))            if " " in value:                date_str, value = value.split(" ")            return time(*_parse_time(value))        else:            if isinstance(value, datetime):                return value.time()            if not isinstance(value, time):                raise TypeError("Expected time, found %s" % repr(value))            return valueclass TimeDeltaVariable(Variable):    @staticmethod    def _parse_set(value, db):        if db:            # XXX Can None ever get here, considering that set() checks for it?            if value is None:                return None            if isinstance(value, timedelta):                return value            if not isinstance(value, (str, unicode)):                raise TypeError("Expected timedelta, found %s" % repr(value))            (years, months, days,             hours, minutes, seconds, microseconds) = _parse_interval(value)            if years != 0:                raise ValueError("Can not handle year intervals")            if months != 0:                raise ValueError("Can not handle month intervals")            return timedelta(days=days, hours=hours,                             minutes=minutes, seconds=seconds,                             microseconds=microseconds)        else:            if not isinstance(value, timedelta):                raise TypeError("Expected timedelta, found %s" % repr(value))            return valueclass EnumVariable(Variable):    def __init__(self, get_map, set_map, *args, **kwargs):        self._get_map = get_map        self._set_map = set_map        Variable.__init__(self, *args, **kwargs)    def _parse_set(self, value, db):        if db:            return value        try:            return self._set_map[value]        except KeyError:            raise ValueError("Invalid enum value: %s" % repr(value))    def _parse_get(self, value, db):        if db:            return value        try:            return self._get_map[value]        except KeyError:            raise ValueError("Invalid enum value: %s" % repr(value))class PickleVariable(Variable):    def __init__(self, *args, **kwargs):        Variable.__init__(self, *args, **kwargs)        if self.event:            self.event.hook("flush", self._detect_changes)            self.event.hook("object-deleted", self._detect_changes)    def _detect_changes(self, obj_info):        if self.get_state() != self._checkpoint_state:            self.event.emit("changed", self, None, self._value, False)    @staticmethod    def _parse_set(value, db):        if db:            if isinstance(value, buffer):                value = str(value)            return pickle.loads(value)        else:            return value    @staticmethod    def _parse_get(value, db):        if db:            return pickle.dumps(value, -1)        else:            return value    def get_state(self):        return (self._lazy_value, pickle.dumps(self._value, -1))    def set_state(self, state):        self._lazy_value = state[0]        self._value = pickle.loads(state[1])    def __hash__(self):        try:            return hash(self._value)        except TypeError:            return hash(pickle.dumps(self._value, -1))class ListVariable(Variable):    def __init__(self, item_factory, *args, **kwargs):        self._item_factory = item_factory        Variable.__init__(self, *args, **kwargs)        if self.event:            self.event.hook("flush", self._detect_changes)            self.event.hook("object-deleted", self._detect_changes)    def _detect_changes(self, obj_info):        if self.get_state() != self._checkpoint_state:            self.event.emit("changed", self, None, self._value, False)    def _parse_set(self, value, db):        if db:            item_factory = self._item_factory            return [item_factory(value=val, from_db=db).get() for val in value]        else:            return value    def _parse_get(self, value, db):        if db:            item_factory = self._item_factory            return [item_factory(value=val, from_db=db) for val in value]        else:            return value    def get_state(self):        return (self._lazy_value, pickle.dumps(self._value, -1))    def set_state(self, state):        self._lazy_value = state[0]        self._value = pickle.loads(state[1])    def __hash__(self):        return hash(pickle.dumps(self._value, -1))def _parse_time(time_str):    # TODO Add support for timezones.    if ":" not in time_str:        raise ValueError("Unknown time format: %r" % time_str)    hour, minute, second = time_str.split(":")    if "." in second:        second, microsecond = second.split(".")        second = int(second)        microsecond = int(int(microsecond) * 10 ** (6 - len(microsecond)))        return int(hour), int(minute), second, microsecond    return int(hour), int(minute), int(second), 0def _parse_date(date_str):    if "-" not in date_str:        raise ValueError("Unknown date format: %r" % date_str)    year, month, day = date_str.split("-")    return int(year), int(month), int(day)# XXX: 20070208 jamesh# The function below comes from psycopgda.adapter, which is licensed# under the ZPL.  Depending on Storm licensing, we may need to replace# it.def _parse_interval(interval_str):    """Parses PostgreSQL interval notation and returns a tuple (years, months,    days, hours, minutes, seconds).    Values accepted:        interval  ::= date                   |  time                   |  date time        date      ::= date_comp                   |  date date_comp        date_comp ::= 1 'day'                   |  number 'days'                   |  1 'month'                   |  1 'mon'                   |  number 'months'                   |  number 'mons'                   |  1 'year'                   |  number 'years'        time      ::= number ':' number                   |  number ':' number ':' number                   |  number ':' number ':' number '.' fraction    """    years = months = days = 0    hours = minutes = seconds = microseconds = 0    elements = interval_str.split()    # Tests with 7.4.6 on Ubuntu 5.4 interval output returns 'mon' and 'mons'    # and not 'month' or 'months' as expected. I've fixed this and left    # the original matches there too in case this is dependant on    # OS or PostgreSQL release.    for i in range(0, len(elements) - 1, 2):        count, unit = elements[i:i+2]        if unit.endswith(','):            unit = unit[:-1]        if unit == 'day' and count == '1':            days += 1        elif unit == 'days':            days += int(count)        elif unit == 'month' and count == '1':            months += 1        elif unit == 'mon' and count == '1':            months += 1        elif unit == 'months':            months += int(count)        elif unit == 'mons':            months += int(count)        elif unit == 'year' and count == '1':            years += 1        elif unit == 'years':            years += int(count)        else:            raise ValueError, 'unknown time interval %s %s' % (count, unit)    if len(elements) % 2 == 1:        hours, minutes, seconds, microseconds = _parse_time(elements[-1])    return (years, months, days, hours, minutes, seconds, microseconds)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -