📄 variables.py
字号:
if type(value) in (int, long, float): value = datetime.utcfromtimestamp(value) elif not isinstance(value, datetime): raise TypeError("Expected datetime, found %s" % repr(value)) if self._tzinfo is not None: value = value.astimezone(self._tzinfo) return valueclass DateVariable(Variable): @staticmethod def _parse_set(value, db): if db: if value is None: return None if isinstance(value, date): return value if not isinstance(value, (str, unicode)): raise TypeError("Expected date, found %s" % repr(value)) if " " in value: value, time_str = value.split(" ") return date(*_parse_date(value)) else: if isinstance(value, datetime): return value.date() if not isinstance(value, date): raise TypeError("Expected date, found %s" % repr(value)) return valueclass TimeVariable(Variable): @staticmethod def _parse_set(value, db): if db: # XXX Can None ever get here, considering that set() checks for it? if value is None: return None if isinstance(value, time): return value if not isinstance(value, (str, unicode)): raise TypeError("Expected time, found %s" % repr(value)) if " " in value: date_str, value = value.split(" ") return time(*_parse_time(value)) else: if isinstance(value, datetime): return value.time() if not isinstance(value, time): raise TypeError("Expected time, found %s" % repr(value)) return valueclass TimeDeltaVariable(Variable): @staticmethod def _parse_set(value, db): if db: # XXX Can None ever get here, considering that set() checks for it? if value is None: return None if isinstance(value, timedelta): return value if not isinstance(value, (str, unicode)): raise TypeError("Expected timedelta, found %s" % repr(value)) (years, months, days, hours, minutes, seconds, microseconds) = _parse_interval(value) if years != 0: raise ValueError("Can not handle year intervals") if months != 0: raise ValueError("Can not handle month intervals") return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds, microseconds=microseconds) else: if not isinstance(value, timedelta): raise TypeError("Expected timedelta, found %s" % repr(value)) return valueclass EnumVariable(Variable): def __init__(self, get_map, set_map, *args, **kwargs): self._get_map = get_map self._set_map = set_map Variable.__init__(self, *args, **kwargs) def _parse_set(self, value, db): if db: return value try: return self._set_map[value] except KeyError: raise ValueError("Invalid enum value: %s" % repr(value)) def _parse_get(self, value, db): if db: return value try: return self._get_map[value] except KeyError: raise ValueError("Invalid enum value: %s" % repr(value))class PickleVariable(Variable): def __init__(self, *args, **kwargs): Variable.__init__(self, *args, **kwargs) if self.event: self.event.hook("flush", self._detect_changes) self.event.hook("object-deleted", self._detect_changes) def _detect_changes(self, obj_info): if self.get_state() != self._checkpoint_state: self.event.emit("changed", self, None, self._value, False) @staticmethod def _parse_set(value, db): if db: if isinstance(value, buffer): value = str(value) return pickle.loads(value) else: return value @staticmethod def _parse_get(value, db): if db: return pickle.dumps(value, -1) else: return value def get_state(self): return (self._lazy_value, pickle.dumps(self._value, -1)) def set_state(self, state): self._lazy_value = state[0] self._value = pickle.loads(state[1]) def __hash__(self): try: return hash(self._value) except TypeError: return hash(pickle.dumps(self._value, -1))class ListVariable(Variable): def __init__(self, item_factory, *args, **kwargs): self._item_factory = item_factory Variable.__init__(self, *args, **kwargs) if self.event: self.event.hook("flush", self._detect_changes) self.event.hook("object-deleted", self._detect_changes) def _detect_changes(self, obj_info): if self.get_state() != self._checkpoint_state: self.event.emit("changed", self, None, self._value, False) def _parse_set(self, value, db): if db: item_factory = self._item_factory return [item_factory(value=val, from_db=db).get() for val in value] else: return value def _parse_get(self, value, db): if db: item_factory = self._item_factory return [item_factory(value=val, from_db=db) for val in value] else: return value def get_state(self): return (self._lazy_value, pickle.dumps(self._value, -1)) def set_state(self, state): self._lazy_value = state[0] self._value = pickle.loads(state[1]) def __hash__(self): return hash(pickle.dumps(self._value, -1))def _parse_time(time_str): # TODO Add support for timezones. if ":" not in time_str: raise ValueError("Unknown time format: %r" % time_str) hour, minute, second = time_str.split(":") if "." in second: second, microsecond = second.split(".") second = int(second) microsecond = int(int(microsecond) * 10 ** (6 - len(microsecond))) return int(hour), int(minute), second, microsecond return int(hour), int(minute), int(second), 0def _parse_date(date_str): if "-" not in date_str: raise ValueError("Unknown date format: %r" % date_str) year, month, day = date_str.split("-") return int(year), int(month), int(day)# XXX: 20070208 jamesh# The function below comes from psycopgda.adapter, which is licensed# under the ZPL. Depending on Storm licensing, we may need to replace# it.def _parse_interval(interval_str): """Parses PostgreSQL interval notation and returns a tuple (years, months, days, hours, minutes, seconds). Values accepted: interval ::= date | time | date time date ::= date_comp | date date_comp date_comp ::= 1 'day' | number 'days' | 1 'month' | 1 'mon' | number 'months' | number 'mons' | 1 'year' | number 'years' time ::= number ':' number | number ':' number ':' number | number ':' number ':' number '.' fraction """ years = months = days = 0 hours = minutes = seconds = microseconds = 0 elements = interval_str.split() # Tests with 7.4.6 on Ubuntu 5.4 interval output returns 'mon' and 'mons' # and not 'month' or 'months' as expected. I've fixed this and left # the original matches there too in case this is dependant on # OS or PostgreSQL release. for i in range(0, len(elements) - 1, 2): count, unit = elements[i:i+2] if unit.endswith(','): unit = unit[:-1] if unit == 'day' and count == '1': days += 1 elif unit == 'days': days += int(count) elif unit == 'month' and count == '1': months += 1 elif unit == 'mon' and count == '1': months += 1 elif unit == 'months': months += int(count) elif unit == 'mons': months += int(count) elif unit == 'year' and count == '1': years += 1 elif unit == 'years': years += int(count) else: raise ValueError, 'unknown time interval %s %s' % (count, unit) if len(elements) % 2 == 1: hours, minutes, seconds, microseconds = _parse_time(elements[-1]) return (years, months, days, hours, minutes, seconds, microseconds)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -