⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlobject.py

📁 Python的一个ORM,现在很火
💻 PY
📖 第 1 页 / 共 2 页
字号:
    def _get_store():        raise NotImplementedError("SQLObjectBase._get_store() "                                  "must be implemented")    @classmethod    def delete(cls, id):        # destroySelf() should be extended to support cascading, so        # we'll mimic what SQLObject does here, even if more expensive.        obj = cls.get(id)        obj.destroySelf()    @classmethod    def get(cls, id):        store = cls._get_store()        obj = store.get(cls, id)        if obj is None:            raise SQLObjectNotFound("Object not found")        return obj    @classmethod    def _parse_orderBy(cls, orderBy):        result = []        if not isinstance(orderBy, (tuple, list)):            orderBy = (orderBy,)        for item in orderBy:            if isinstance(item, basestring):                desc = item.startswith("-")                if desc:                    item = item[1:]                item = cls._attr_to_prop.get(item, item)                if desc:                    item = Desc(item)            result.append(item)        return tuple(result)    @classmethod    def _find(cls, clause=None, clauseTables=None, orderBy=None,              limit=None, distinct=None, prejoins=_IGNORED,              prejoinClauseTables=_IGNORED, _by={}):        store = cls._get_store()        if clause is None:            args = ()        else:            args = (clause,)        if clauseTables is not None:            clauseTables = set(table.lower() for table in clauseTables)            clauseTables.add(cls.__storm_table__.lower())            store = store.using(*clauseTables)        result = store.find(cls, *args, **_by)        if orderBy is not None:            result.order_by(*cls._parse_orderBy(orderBy))        result.config(limit=limit, distinct=distinct)        return result    @classmethod    def select(cls, *args, **kwargs):        result = cls._find(*args, **kwargs)        return SQLObjectResultSet(result, cls)    @classmethod    def selectBy(cls, orderBy=None, **kwargs):        result = cls._find(orderBy=orderBy, _by=kwargs)        return SQLObjectResultSet(result, cls)    @classmethod    def selectOne(cls, *args, **kwargs):        return cls._find(*args, **kwargs).one()    @classmethod    def selectOneBy(cls, **kwargs):        return cls._find(_by=kwargs).one()    @classmethod    def selectFirst(cls, *args, **kwargs):        return cls._find(*args, **kwargs).first()    @classmethod    def selectFirstBy(cls, orderBy=None, **kwargs):        return cls._find(orderBy=orderBy, _by=kwargs).first()    # Dummy methods.    def sync(self): pass    def syncUpdate(self): passclass SQLObjectResultSet(object):    def __init__(self, result_set, cls):        self._result_set = result_set        self._cls = cls    def count(self):        return self._result_set.count()    def __iter__(self):        return self._result_set.__iter__()    def __getitem__(self, index):        result_set = self._result_set[index]        if isinstance(index, slice):            return self.__class__(result_set, self._cls)        return result_set    def __nonzero__(self):        return self._result_set.any() is not None    def orderBy(self, orderBy):        result_set = self._result_set.copy()        result_set.order_by(*self._cls._parse_orderBy(orderBy))        return self.__class__(result_set, self._cls)    def limit(self, limit):        result_set = self._result_set.copy().config(limit=limit)        return self.__class__(result_set, self._cls)    def distinct(self):        result_set = self._result_set.copy().config(distinct=True)        result_set.order_by() # Remove default order.        return self.__class__(result_set, self._cls)    def union(self, otherSelect, unionAll=False, orderBy=None):        result_set = self._result_set.union(otherSelect._result_set,                                            all=unionAll)        result_set.order_by() # Remove default order.        new = self.__class__(result_set, self._cls)        if orderBy is not None:            return new.orderBy(orderBy)        return new    def except_(self, otherSelect, exceptAll=False, orderBy=None):        result_set = self._result_set.difference(otherSelect._result_set,                                                 all=exceptAll)        result_set.order_by() # Remove default order.        new = self.__class__(result_set, self._cls)        if orderBy is not None:            return new.orderBy(orderBy)        return new    def intersect(self, otherSelect, intersectAll=False, orderBy=None):        result_set = self._result_set.intersection(otherSelect._result_set,                                                   all=intersectAll)        new = self.__class__(result_set, self._cls)        if orderBy is not None:            return new.orderBy(orderBy)        return new    def prejoin(self, prejoins):        return self    def prejoinClauseTables(self, prejoinClauseTables):        return selfclass PropertyAdapter(object):    _kwargs = {}    def __init__(self, dbName=None, notNull=False, default=Undef,                 alternateID=None, unique=_IGNORED, name=_IGNORED,                 alternateMethodName=None, length=_IGNORED, immutable=None,                 prejoins=_IGNORED):        if default is None and notNull:            raise RuntimeError("Can't use default=None and notNull=True")        self.dbName = dbName        self.alternateID = alternateID        self.alternateMethodName = alternateMethodName        # XXX Implement handler for:        #        #   - immutable (causes setting the attribute to fail)        #        # XXX Implement tests for ignored parameters:        #        #   - unique (for tablebuilder)        #   - length (for tablebuilder for StringCol)        #   - name (for _columns stuff)        #   - prejoins        if callable(default):            default_factory = default            default = Undef        else:            default_factory = Undef        super(PropertyAdapter, self).__init__(dbName, allow_none=not notNull,                                              default_factory=default_factory,                                              default=default, **self._kwargs)class AutoUnicodeVariable(Variable):    """Unlike UnicodeVariable, this will try to convert str to unicode."""    @staticmethod    def _parse_set(value, from_db):        if not isinstance(value, basestring):            raise TypeError("Expected basestring, found %s" % repr(type(value)))        return unicode(value)class AutoUnicode(SimpleProperty):    variable_class = AutoUnicodeVariableclass StringCol(PropertyAdapter, AutoUnicode):    passclass IntCol(PropertyAdapter, Int):    passclass BoolCol(PropertyAdapter, Bool):    passclass FloatCol(PropertyAdapter, Float):    passclass UtcDateTimeCol(PropertyAdapter, DateTime):    _kwargs = {"tzinfo": tzutc()}class DateCol(PropertyAdapter, Date):    passclass IntervalCol(PropertyAdapter, TimeDelta):    passclass ForeignKey(object):    def __init__(self, foreignKey, **kwargs):        self.foreignKey = foreignKey        self.kwargs = kwargsclass SQLMultipleJoin(ReferenceSet):    def __init__(self, otherClass=None, joinColumn=None,                 intermediateTable=None, otherColumn=None, orderBy=None,                 prejoins=_IGNORED):        if intermediateTable:            args = ("<primary key>",                    "%s.%s" % (intermediateTable, joinColumn),                    "%s.%s" % (intermediateTable, otherColumn),                    "%s.<primary key>" % otherClass)        else:            args = ("<primary key>", "%s.%s" % (otherClass, joinColumn))        ReferenceSet.__init__(self, *args)        self._orderBy = orderBy    def __get__(self, obj, cls=None):        if obj is None:            return self        bound_reference_set = ReferenceSet.__get__(self, obj)        target_cls = bound_reference_set._target_cls        result_set = bound_reference_set.find()        if self._orderBy:            result_set.order_by(*target_cls._parse_orderBy(self._orderBy))        return SQLObjectResultSet(result_set, target_cls)SQLRelatedJoin = SQLMultipleJoinclass CONTAINSSTRING(Like):    def __init__(self, expr, string):        string = string.replace("!", "!!") \                       .replace("_", "!_") \                       .replace("%", "!%")        Like.__init__(self, expr, "%"+string+"%", SQLRaw("'!'"))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -