📄 expression.py
字号:
def __setitem__(self, key, value): if key in self: # this warning is primarily to catch select() statements which have conflicting # column names in their exported columns collection existing = self[key] if not existing.shares_lineage(value): table = getattr(existing, 'table', None) and existing.table.description util.warn(("Column %r on table %r being replaced by another " "column with the same key. Consider use_labels " "for select() statements.") % (key, table)) util.OrderedProperties.__setitem__(self, key, value) def remove(self, column): del self[column.key] def extend(self, iter): for c in iter: self.add(c) def __eq__(self, other): l = [] for c in other: for local in self: if c.shares_lineage(local): l.append(c==local) return and_(*l) def __contains__(self, other): if not isinstance(other, basestring): raise exceptions.ArgumentError("__contains__ requires a string argument") return util.OrderedProperties.__contains__(self, other) def contains_column(self, col): # have to use a Set here, because it will compare the identity # of the column, not just using "==" for comparison which will always return a # "True" value (i.e. a BinaryClause...) return col in util.Set(self)class ColumnSet(util.OrderedSet): def contains_column(self, col): return col in self def extend(self, cols): for col in cols: self.add(col) def __add__(self, other): return list(self) + list(other) def __eq__(self, other): l = [] for c in other: for local in self: if c.shares_lineage(local): l.append(c==local) return and_(*l)class Selectable(ClauseElement): """mark a class as being selectable"""class FromClause(Selectable): """Represent an element that can be used within the ``FROM`` clause of a ``SELECT`` statement.""" __visit_name__ = 'fromclause' named_with_column=False _hide_froms = [] def __init__(self): self.oid_column = None def _get_from_objects(self, **modifiers): return [] def default_order_by(self): return [self.oid_column] def count(self, whereclause=None, **params): """return a SELECT COUNT generated against this ``FromClause``.""" if self.primary_key: col = list(self.primary_key)[0] else: col = list(self.columns)[0] return select([func.count(col).label('tbl_row_count')], whereclause, from_obj=[self], **params) def select(self, whereclauses = None, **params): """return a SELECT of this ``FromClause``.""" return select([self], whereclauses, **params) def join(self, right, *args, **kwargs): """return a join of this ``FromClause`` against another ``FromClause``.""" return Join(self, right, *args, **kwargs) def outerjoin(self, right, *args, **kwargs): """return an outer join of this ``FromClause`` against another ``FromClause``.""" return Join(self, right, isouter=True, *args, **kwargs) def alias(self, name=None): """return an alias of this ``FromClause`` against another ``FromClause``.""" return Alias(self, name) def is_derived_from(self, fromclause): """Return True if this FromClause is 'derived' from the given FromClause. An example would be an Alias of a Table is derived from that Table. """ return fromclause in util.Set(self._cloned_set) def replace_selectable(self, old, alias): """replace all occurences of FromClause 'old' with the given Alias object, returning a copy of this ``FromClause``.""" global ClauseAdapter if ClauseAdapter is None: from sqlalchemy.sql.util import ClauseAdapter return ClauseAdapter(alias).traverse(self, clone=True) def corresponding_column(self, column, require_embedded=False): """Given a ``ColumnElement``, return the exported ``ColumnElement`` object from this ``Selectable`` which corresponds to that original ``Column`` via a common anscestor column. column the target ``ColumnElement`` to be matched require_embedded only return corresponding columns for the given ``ColumnElement``, if the given ``ColumnElement`` is actually present within a sub-element of this ``FromClause``. Normally the column will match if it merely shares a common anscestor with one of the exported columns of this ``FromClause``. """ # dont dig around if the column is locally present if self.c.contains_column(column): return column col, intersect = None, None target_set = column.proxy_set for c in self.c + [self.oid_column]: i = c.proxy_set.intersection(target_set) if i and \ (not require_embedded or c.proxy_set.issuperset(target_set)) and \ (intersect is None or len(i) > len(intersect)): col, intersect = c, i return col def description(self): """a brief description of this FromClause. Used primarily for error message formatting. """ return getattr(self, 'name', self.__class__.__name__ + " object") description = property(description) def _clone_from_clause(self): # delete all the "generated" collections of columns for a # newly cloned FromClause, so that they will be re-derived # from the item. this is because FromClause subclasses, when # cloned, need to reestablish new "proxied" columns that are # linked to the new item for attr in ('_columns', '_primary_key' '_foreign_keys', '_oid_column', '_embedded_columns', '_all_froms'): if hasattr(self, attr): delattr(self, attr) def _expr_attr_func(name): def attr(self): try: return getattr(self, name) except AttributeError: self._export_columns() return getattr(self, name) return property(attr) columns = c = _expr_attr_func('_columns') primary_key = _expr_attr_func('_primary_key') foreign_keys = _expr_attr_func('_foreign_keys') def _export_columns(self, columns=None): """Initialize column collections.""" if hasattr(self, '_columns') and columns is None: return self._columns = ColumnCollection() self._primary_key = ColumnSet() self._foreign_keys = util.Set() if columns is None: columns = self._flatten_exportable_columns() for co in columns: cp = self._proxy_column(co) def _flatten_exportable_columns(self): """Return the list of ColumnElements represented within this FromClause's _exportable_columns""" export = self._exportable_columns() for column in export: if isinstance(column, Selectable): for co in column.columns: yield co elif isinstance(column, ColumnElement): yield column else: continue def _exportable_columns(self): return [] def _proxy_column(self, column): return column._make_proxy(self)class _TextFromClause(FromClause): __visit_name__ = 'fromclause' def __init__(self, text): self.name = textclass _BindParamClause(ClauseElement, _CompareMixin): """Represent a bind parameter. Public constructor is the ``bindparam()`` function. """ __visit_name__ = 'bindparam' def __init__(self, key, value, type_=None, unique=False, isoutparam=False, shortname=None): """Construct a _BindParamClause. key the key for this bind param. Will be used in the generated SQL statement for dialects that use named parameters. This value may be modified when part of a compilation operation, if other ``_BindParamClause`` objects exist with the same key, or if its length is too long and truncation is required. value Initial value for this bind param. This value may be overridden by the dictionary of parameters sent to statement compilation/execution. shortname deprecated. type\_ A ``TypeEngine`` object that will be used to pre-process the value corresponding to this ``_BindParamClause`` at execution time. unique if True, the key name of this BindParamClause will be modified if another ``_BindParamClause`` of the same name already has been located within the containing ``ClauseElement``. isoutparam if True, the parameter should be treated like a stored procedure "OUT" parameter. """ if unique: self.key = "{ANON %d %s}" % (id(self), key or 'param') else: self.key = key or "{ANON %d param}" % id(self) self._orig_key = key or 'param' self.unique = unique self.value = value self.isoutparam = isoutparam self.shortname = shortname if type_ is None: self.type = sqltypes.type_map.get(type(value), sqltypes.NullType)() elif isinstance(type_, type): self.type = type_() else: self.type = type_ def _clone(self): c = ClauseElement._clone(self) if self.unique: c.key = "{ANON %d %s}" % (id(c), c._orig_key or 'param') return c def _convert_to_unique(self): if not self.unique: self.unique=True self.key = "{ANON %d %s}" % (id(self), self._orig_key or 'param') def _get_from_objects(self, **modifiers): return [] def bind_processor(self, dialect): return self.type.dialect_impl(dialect).bind_processor(dialect) def _compare_type(self, obj): if not isinstance(self.type, sqltypes.NullType): return self.type else: return obj.type def compare(self, other): """Compare this ``_BindParamClause`` to the given clause. Since ``compare()`` is meant to compare statement syntax, this method returns True if the two ``_BindParamClauses`` have just the same type. """ return isinstance(other, _BindParamClause) and other.type.__class__ == self.type.__class__ def __repr__(self): return "_BindParamClause(%s, %s, type_=%s)" % (repr(self.key), repr(self.value), repr(self.type))class _TypeClause(ClauseElement): """Handle a type keyword in a SQL statement. Used by the ``Case`` statement. """ __visit_name__ = 'typeclause' def __init__(self, type): self.type = type def _get_from_objects(self, **modifiers): return []class _TextClause(ClauseElement): """Represent a literal SQL text fragment. Public constructor is the ``text()`` function. """ __visit_name__ = 'textclause' _bind_params_regex = re.compile(r'(?<![:\w\x5c]):(\w+)(?!:)', re.UNICODE) def __init__(self, text = "", bind=None, bindparams=None, typemap=None, autocommit=False): self._bind = bind self.bindparams = {} self.typemap = typemap self._autocommit = autocommit if typemap is not None: for key in typemap.keys(): typemap[key] = sqltypes.to_instance(typemap[key]) def repl(m): self.bindparams[m.group(1)] = bindparam(m.group(1)) return ":%s" % m.group(1) # scan the string and search for bind parameter names, add them # to the list of bindparams self.text = self._bind_params_regex.sub(repl, text) if bindparams is not None: for b in bindparams: self.bindparams[b.key] = b def type(self): if self.typemap is not None and len(self.typemap) == 1: return list(self.typemap)[0] else: return None type = property(type) def _copy_internals(self, clone=_clone): self.bindparams = dict([(b.key, clone(b)) for b in self.bindparams.values()]) def get_children(self, **kwargs): return self.bindparams.values() def _get_from_objects(self, *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -