⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 expression.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
    def __setitem__(self, key, value):        if key in self:            # this warning is primarily to catch select() statements which have conflicting            # column names in their exported columns collection            existing = self[key]            if not existing.shares_lineage(value):                table = getattr(existing, 'table', None) and existing.table.description                util.warn(("Column %r on table %r being replaced by another "                           "column with the same key.  Consider use_labels "                           "for select() statements.")  % (key, table))        util.OrderedProperties.__setitem__(self, key, value)    def remove(self, column):        del self[column.key]    def extend(self, iter):        for c in iter:            self.add(c)    def __eq__(self, other):        l = []        for c in other:            for local in self:                if c.shares_lineage(local):                    l.append(c==local)        return and_(*l)    def __contains__(self, other):        if not isinstance(other, basestring):            raise exceptions.ArgumentError("__contains__ requires a string argument")        return util.OrderedProperties.__contains__(self, other)    def contains_column(self, col):        # have to use a Set here, because it will compare the identity        # of the column, not just using "==" for comparison which will always return a        # "True" value (i.e. a BinaryClause...)        return col in util.Set(self)class ColumnSet(util.OrderedSet):    def contains_column(self, col):        return col in self    def extend(self, cols):        for col in cols:            self.add(col)    def __add__(self, other):        return list(self) + list(other)    def __eq__(self, other):        l = []        for c in other:            for local in self:                if c.shares_lineage(local):                    l.append(c==local)        return and_(*l)class Selectable(ClauseElement):    """mark a class as being selectable"""class FromClause(Selectable):    """Represent an element that can be used within the ``FROM`` clause of a ``SELECT`` statement."""    __visit_name__ = 'fromclause'    named_with_column=False    _hide_froms = []    def __init__(self):        self.oid_column = None    def _get_from_objects(self, **modifiers):        return []    def default_order_by(self):        return [self.oid_column]    def count(self, whereclause=None, **params):        """return a SELECT COUNT generated against this ``FromClause``."""        if self.primary_key:            col = list(self.primary_key)[0]        else:            col = list(self.columns)[0]        return select([func.count(col).label('tbl_row_count')], whereclause, from_obj=[self], **params)    def select(self, whereclauses = None, **params):        """return a SELECT of this ``FromClause``."""        return select([self], whereclauses, **params)    def join(self, right, *args, **kwargs):        """return a join of this ``FromClause`` against another ``FromClause``."""        return Join(self, right, *args, **kwargs)    def outerjoin(self, right, *args, **kwargs):        """return an outer join of this ``FromClause`` against another ``FromClause``."""        return Join(self, right, isouter=True, *args, **kwargs)    def alias(self, name=None):        """return an alias of this ``FromClause`` against another ``FromClause``."""        return Alias(self, name)    def is_derived_from(self, fromclause):        """Return True if this FromClause is 'derived' from the given FromClause.        An example would be an Alias of a Table is derived from that Table.        """        return fromclause in util.Set(self._cloned_set)    def replace_selectable(self, old, alias):      """replace all occurences of FromClause 'old' with the given Alias object, returning a copy of this ``FromClause``."""      global ClauseAdapter      if ClauseAdapter is None:          from sqlalchemy.sql.util import ClauseAdapter      return ClauseAdapter(alias).traverse(self, clone=True)    def corresponding_column(self, column, require_embedded=False):        """Given a ``ColumnElement``, return the exported ``ColumnElement``        object from this ``Selectable`` which corresponds to that        original ``Column`` via a common anscestor column.        column          the target ``ColumnElement`` to be matched        require_embedded          only return corresponding columns for the given          ``ColumnElement``, if the given ``ColumnElement`` is          actually present within a sub-element of this          ``FromClause``.  Normally the column will match if it merely          shares a common anscestor with one of the exported columns          of this ``FromClause``.        """        # dont dig around if the column is locally present        if self.c.contains_column(column):            return column        col, intersect = None, None        target_set = column.proxy_set        for c in self.c + [self.oid_column]:            i = c.proxy_set.intersection(target_set)            if i and \                (not require_embedded or c.proxy_set.issuperset(target_set)) and \                (intersect is None or len(i) > len(intersect)):                col, intersect = c, i        return col    def description(self):        """a brief description of this FromClause.        Used primarily for error message formatting.        """        return getattr(self, 'name', self.__class__.__name__ + " object")    description = property(description)    def _clone_from_clause(self):        # delete all the "generated" collections of columns for a        # newly cloned FromClause, so that they will be re-derived        # from the item.  this is because FromClause subclasses, when        # cloned, need to reestablish new "proxied" columns that are        # linked to the new item        for attr in ('_columns', '_primary_key' '_foreign_keys', '_oid_column', '_embedded_columns', '_all_froms'):            if hasattr(self, attr):                delattr(self, attr)    def _expr_attr_func(name):        def attr(self):            try:                return getattr(self, name)            except AttributeError:                self._export_columns()                return getattr(self, name)        return property(attr)    columns = c = _expr_attr_func('_columns')    primary_key = _expr_attr_func('_primary_key')    foreign_keys = _expr_attr_func('_foreign_keys')    def _export_columns(self, columns=None):        """Initialize column collections."""        if hasattr(self, '_columns') and columns is None:            return        self._columns = ColumnCollection()        self._primary_key = ColumnSet()        self._foreign_keys = util.Set()        if columns is None:            columns = self._flatten_exportable_columns()        for co in columns:            cp = self._proxy_column(co)    def _flatten_exportable_columns(self):        """Return the list of ColumnElements represented within this FromClause's _exportable_columns"""        export = self._exportable_columns()        for column in export:            if isinstance(column, Selectable):                for co in column.columns:                    yield co            elif isinstance(column, ColumnElement):                yield column            else:                continue    def _exportable_columns(self):        return []    def _proxy_column(self, column):        return column._make_proxy(self)class _TextFromClause(FromClause):    __visit_name__ = 'fromclause'    def __init__(self, text):        self.name = textclass _BindParamClause(ClauseElement, _CompareMixin):    """Represent a bind parameter.    Public constructor is the ``bindparam()`` function.    """    __visit_name__ = 'bindparam'    def __init__(self, key, value, type_=None, unique=False, isoutparam=False, shortname=None):        """Construct a _BindParamClause.        key          the key for this bind param.  Will be used in the generated          SQL statement for dialects that use named parameters.  This          value may be modified when part of a compilation operation,          if other ``_BindParamClause`` objects exist with the same          key, or if its length is too long and truncation is          required.        value          Initial value for this bind param.  This value may be          overridden by the dictionary of parameters sent to statement          compilation/execution.        shortname          deprecated.        type\_          A ``TypeEngine`` object that will be used to pre-process the          value corresponding to this ``_BindParamClause`` at          execution time.        unique          if True, the key name of this BindParamClause will be          modified if another ``_BindParamClause`` of the same name          already has been located within the containing          ``ClauseElement``.        isoutparam          if True, the parameter should be treated like a stored procedure "OUT"          parameter.        """        if unique:            self.key = "{ANON %d %s}" % (id(self), key or 'param')        else:            self.key = key or "{ANON %d param}" % id(self)        self._orig_key = key or 'param'        self.unique = unique        self.value = value        self.isoutparam = isoutparam        self.shortname = shortname        if type_ is None:            self.type = sqltypes.type_map.get(type(value), sqltypes.NullType)()        elif isinstance(type_, type):            self.type = type_()        else:            self.type = type_    def _clone(self):        c = ClauseElement._clone(self)        if self.unique:            c.key = "{ANON %d %s}" % (id(c), c._orig_key or 'param')        return c    def _convert_to_unique(self):        if not self.unique:            self.unique=True            self.key = "{ANON %d %s}" % (id(self), self._orig_key or 'param')    def _get_from_objects(self, **modifiers):        return []    def bind_processor(self, dialect):        return self.type.dialect_impl(dialect).bind_processor(dialect)    def _compare_type(self, obj):        if not isinstance(self.type, sqltypes.NullType):            return self.type        else:            return obj.type    def compare(self, other):        """Compare this ``_BindParamClause`` to the given clause.        Since ``compare()`` is meant to compare statement syntax, this        method returns True if the two ``_BindParamClauses`` have just        the same type.        """        return isinstance(other, _BindParamClause) and other.type.__class__ == self.type.__class__    def __repr__(self):        return "_BindParamClause(%s, %s, type_=%s)" % (repr(self.key), repr(self.value), repr(self.type))class _TypeClause(ClauseElement):    """Handle a type keyword in a SQL statement.    Used by the ``Case`` statement.    """    __visit_name__ = 'typeclause'    def __init__(self, type):        self.type = type    def _get_from_objects(self, **modifiers):        return []class _TextClause(ClauseElement):    """Represent a literal SQL text fragment.    Public constructor is the ``text()`` function.    """    __visit_name__ = 'textclause'    _bind_params_regex = re.compile(r'(?<![:\w\x5c]):(\w+)(?!:)', re.UNICODE)    def __init__(self, text = "", bind=None, bindparams=None, typemap=None, autocommit=False):        self._bind = bind        self.bindparams = {}        self.typemap = typemap        self._autocommit = autocommit        if typemap is not None:            for key in typemap.keys():                typemap[key] = sqltypes.to_instance(typemap[key])        def repl(m):            self.bindparams[m.group(1)] = bindparam(m.group(1))            return ":%s" % m.group(1)        # scan the string and search for bind parameter names, add them        # to the list of bindparams        self.text = self._bind_params_regex.sub(repl, text)        if bindparams is not None:            for b in bindparams:                self.bindparams[b.key] = b    def type(self):        if self.typemap is not None and len(self.typemap) == 1:            return list(self.typemap)[0]        else:            return None    type = property(type)    def _copy_internals(self, clone=_clone):        self.bindparams = dict([(b.key, clone(b)) for b in self.bindparams.values()])    def get_children(self, **kwargs):        return self.bindparams.values()    def _get_from_objects(self, *

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -