📄 sqlobject.py
字号:
def _get_store(): raise NotImplementedError("SQLObjectBase._get_store() " "must be implemented") @classmethod def delete(cls, id): # destroySelf() should be extended to support cascading, so # we'll mimic what SQLObject does here, even if more expensive. obj = cls.get(id) obj.destroySelf() @classmethod def get(cls, id): store = cls._get_store() obj = store.get(cls, id) if obj is None: raise SQLObjectNotFound("Object not found") return obj @classmethod def _parse_orderBy(cls, orderBy): result = [] if not isinstance(orderBy, (tuple, list)): orderBy = (orderBy,) for item in orderBy: if isinstance(item, basestring): desc = item.startswith("-") if desc: item = item[1:] item = cls._attr_to_prop.get(item, item) if desc: item = Desc(item) result.append(item) return tuple(result) @classmethod def _find(cls, clause=None, clauseTables=None, orderBy=None, limit=None, distinct=None, prejoins=_IGNORED, prejoinClauseTables=_IGNORED, _by={}): store = cls._get_store() if clause is None: args = () else: args = (clause,) if clauseTables is not None: clauseTables = set(table.lower() for table in clauseTables) clauseTables.add(cls.__storm_table__.lower()) store = store.using(*clauseTables) result = store.find(cls, *args, **_by) if orderBy is not None: result.order_by(*cls._parse_orderBy(orderBy)) result.config(limit=limit, distinct=distinct) return result @classmethod def select(cls, *args, **kwargs): result = cls._find(*args, **kwargs) return SQLObjectResultSet(result, cls) @classmethod def selectBy(cls, orderBy=None, **kwargs): result = cls._find(orderBy=orderBy, _by=kwargs) return SQLObjectResultSet(result, cls) @classmethod def selectOne(cls, *args, **kwargs): return cls._find(*args, **kwargs).one() @classmethod def selectOneBy(cls, **kwargs): return cls._find(_by=kwargs).one() @classmethod def selectFirst(cls, *args, **kwargs): return cls._find(*args, **kwargs).first() @classmethod def selectFirstBy(cls, orderBy=None, **kwargs): return cls._find(orderBy=orderBy, _by=kwargs).first() # Dummy methods. def sync(self): pass def syncUpdate(self): passclass SQLObjectResultSet(object): def __init__(self, result_set, cls): self._result_set = result_set self._cls = cls def count(self): return self._result_set.count() def __iter__(self): return self._result_set.__iter__() def __getitem__(self, index): result_set = self._result_set[index] if isinstance(index, slice): return self.__class__(result_set, self._cls) return result_set def __nonzero__(self): return self._result_set.any() is not None def orderBy(self, orderBy): result_set = self._result_set.copy() result_set.order_by(*self._cls._parse_orderBy(orderBy)) return self.__class__(result_set, self._cls) def limit(self, limit): result_set = self._result_set.copy().config(limit=limit) return self.__class__(result_set, self._cls) def distinct(self): result_set = self._result_set.copy().config(distinct=True) result_set.order_by() # Remove default order. return self.__class__(result_set, self._cls) def union(self, otherSelect, unionAll=False, orderBy=None): result_set = self._result_set.union(otherSelect._result_set, all=unionAll) result_set.order_by() # Remove default order. new = self.__class__(result_set, self._cls) if orderBy is not None: return new.orderBy(orderBy) return new def except_(self, otherSelect, exceptAll=False, orderBy=None): result_set = self._result_set.difference(otherSelect._result_set, all=exceptAll) result_set.order_by() # Remove default order. new = self.__class__(result_set, self._cls) if orderBy is not None: return new.orderBy(orderBy) return new def intersect(self, otherSelect, intersectAll=False, orderBy=None): result_set = self._result_set.intersection(otherSelect._result_set, all=intersectAll) new = self.__class__(result_set, self._cls) if orderBy is not None: return new.orderBy(orderBy) return new def prejoin(self, prejoins): return self def prejoinClauseTables(self, prejoinClauseTables): return selfclass PropertyAdapter(object): _kwargs = {} def __init__(self, dbName=None, notNull=False, default=Undef, alternateID=None, unique=_IGNORED, name=_IGNORED, alternateMethodName=None, length=_IGNORED, immutable=None, prejoins=_IGNORED): if default is None and notNull: raise RuntimeError("Can't use default=None and notNull=True") self.dbName = dbName self.alternateID = alternateID self.alternateMethodName = alternateMethodName # XXX Implement handler for: # # - immutable (causes setting the attribute to fail) # # XXX Implement tests for ignored parameters: # # - unique (for tablebuilder) # - length (for tablebuilder for StringCol) # - name (for _columns stuff) # - prejoins if callable(default): default_factory = default default = Undef else: default_factory = Undef super(PropertyAdapter, self).__init__(dbName, allow_none=not notNull, default_factory=default_factory, default=default, **self._kwargs)class AutoUnicodeVariable(Variable): """Unlike UnicodeVariable, this will try to convert str to unicode.""" @staticmethod def _parse_set(value, from_db): if not isinstance(value, basestring): raise TypeError("Expected basestring, found %s" % repr(type(value))) return unicode(value)class AutoUnicode(SimpleProperty): variable_class = AutoUnicodeVariableclass StringCol(PropertyAdapter, AutoUnicode): passclass IntCol(PropertyAdapter, Int): passclass BoolCol(PropertyAdapter, Bool): passclass FloatCol(PropertyAdapter, Float): passclass UtcDateTimeCol(PropertyAdapter, DateTime): _kwargs = {"tzinfo": tzutc()}class DateCol(PropertyAdapter, Date): passclass IntervalCol(PropertyAdapter, TimeDelta): passclass ForeignKey(object): def __init__(self, foreignKey, **kwargs): self.foreignKey = foreignKey self.kwargs = kwargsclass SQLMultipleJoin(ReferenceSet): def __init__(self, otherClass=None, joinColumn=None, intermediateTable=None, otherColumn=None, orderBy=None, prejoins=_IGNORED): if intermediateTable: args = ("<primary key>", "%s.%s" % (intermediateTable, joinColumn), "%s.%s" % (intermediateTable, otherColumn), "%s.<primary key>" % otherClass) else: args = ("<primary key>", "%s.%s" % (otherClass, joinColumn)) ReferenceSet.__init__(self, *args) self._orderBy = orderBy def __get__(self, obj, cls=None): if obj is None: return self bound_reference_set = ReferenceSet.__get__(self, obj) target_cls = bound_reference_set._target_cls result_set = bound_reference_set.find() if self._orderBy: result_set.order_by(*target_cls._parse_orderBy(self._orderBy)) return SQLObjectResultSet(result_set, target_cls)SQLRelatedJoin = SQLMultipleJoinclass CONTAINSSTRING(Like): def __init__(self, expr, string): string = string.replace("!", "!!") \ .replace("_", "!_") \ .replace("%", "!%") Like.__init__(self, expr, "%"+string+"%", SQLRaw("'!'"))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -