📄 schema.py
字号:
def references(self, table): """Return True if the given table is referenced by this ``ForeignKey``.""" return table.corresponding_column(self.column) is not None def get_referent(self, table): """return the column in the given table referenced by this ``ForeignKey``, or None if this ``ForeignKey`` does not reference the given table. """ return table.corresponding_column(self.column) def column(self): # ForeignKey inits its remote column as late as possible, so tables can # be defined without dependencies if self._column is None: if isinstance(self._colspec, basestring): # locate the parent table this foreign key is attached to. # we use the "original" column which our parent column represents # (its a list of columns/other ColumnElements if the parent table is a UNION) for c in self.parent.base_columns: if isinstance(c, Column): parenttable = c.table break else: raise exceptions.ArgumentError("Parent column '%s' does not descend from a table-attached Column" % str(self.parent)) m = re.match(r"^(.+?)(?:\.(.+?))?(?:\.(.+?))?$", self._colspec, re.UNICODE) if m is None: raise exceptions.ArgumentError("Invalid foreign key column specification: " + self._colspec) if m.group(3) is None: (tname, colname) = m.group(1, 2) schema = None else: (schema,tname,colname) = m.group(1,2,3) if _get_table_key(tname, schema) not in parenttable.metadata: raise exceptions.InvalidRequestError("Could not find table '%s' with which to generate a foreign key" % tname) table = Table(tname, parenttable.metadata, mustexist=True, schema=schema) try: if colname is None: # colname is None in the case that ForeignKey argument was specified # as table name only, in which case we match the column name to the same # column on the parent. key = self.parent self._column = table.c[self.parent.key] else: self._column = table.c[colname] except KeyError, e: raise exceptions.ArgumentError("Could not create ForeignKey '%s' on table '%s': table '%s' has no column named '%s'" % (self._colspec, parenttable.name, table.name, str(e))) else: self._column = self._colspec # propigate TypeEngine to parent if it didnt have one if isinstance(self.parent.type, types.NullType): self.parent.type = self._column.type return self._column column = property(column) def _set_parent(self, column): self.parent = column if self.parent._pre_existing_column is not None: # remove existing FK which matches us for fk in self.parent._pre_existing_column.foreign_keys: if fk._colspec == self._colspec: self.parent.table.foreign_keys.remove(fk) self.parent.table.constraints.remove(fk.constraint) if self.constraint is None and isinstance(self.parent.table, Table): self.constraint = ForeignKeyConstraint([],[], use_alter=self.use_alter, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, deferrable=self.deferrable, initially=self.initially) self.parent.table.append_constraint(self.constraint) self.constraint._append_fk(self) self.parent.foreign_keys.add(self) self.parent.table.foreign_keys.add(self)class DefaultGenerator(SchemaItem): """Base class for column *default* values.""" def __init__(self, for_update=False, metadata=None): self.for_update = for_update self.metadata = util.assert_arg_type(metadata, (MetaData, type(None)), 'metadata') def _set_parent(self, column): self.column = column self.metadata = self.column.table.metadata if self.for_update: self.column.onupdate = self else: self.column.default = self def execute(self, bind=None, **kwargs): if bind is None: bind = _bind_or_error(self) return bind._execute_default(self, **kwargs) def __repr__(self): return "DefaultGenerator()"class PassiveDefault(DefaultGenerator): """A default that takes effect on the database side.""" def __init__(self, arg, **kwargs): super(PassiveDefault, self).__init__(**kwargs) self.arg = arg def __repr__(self): return "PassiveDefault(%s)" % repr(self.arg)class ColumnDefault(DefaultGenerator): """A plain default value on a column. This could correspond to a constant, a callable function, or a SQL clause. """ def __init__(self, arg, **kwargs): super(ColumnDefault, self).__init__(**kwargs) if callable(arg): arg = self._maybe_wrap_callable(arg) self.arg = arg def _maybe_wrap_callable(self, fn): """Backward compat: Wrap callables that don't accept a context.""" if inspect.isfunction(fn): inspectable = fn elif inspect.isclass(fn): inspectable = fn.__init__ elif hasattr(fn, '__call__'): inspectable = fn.__call__ else: # probably not inspectable, try anyways. inspectable = fn try: argspec = inspect.getargspec(inspectable) except TypeError: return lambda ctx: fn() positionals = len(argspec[0]) if inspect.ismethod(inspectable): positionals -= 1 if positionals == 0: return lambda ctx: fn() defaulted = argspec[3] is not None and len(argspec[3]) or 0 if positionals - defaulted > 1: raise exceptions.ArgumentError( "ColumnDefault Python function takes zero or one " "positional arguments") return fn def _visit_name(self): if self.for_update: return "column_onupdate" else: return "column_default" __visit_name__ = property(_visit_name) def __repr__(self): return "ColumnDefault(%s)" % repr(self.arg)class Sequence(DefaultGenerator): """Represents a named sequence.""" def __init__(self, name, start=None, increment=None, schema=None, optional=False, quote=False, **kwargs): super(Sequence, self).__init__(**kwargs) self.name = name self.start = start self.increment = increment self.optional=optional self.quote = quote self.schema = schema self.kwargs = kwargs def __repr__(self): return "Sequence(%s)" % ', '.join( [repr(self.name)] + ["%s=%s" % (k, repr(getattr(self, k))) for k in ['start', 'increment', 'optional']]) def _set_parent(self, column): super(Sequence, self)._set_parent(column) column.sequence = self def create(self, bind=None, checkfirst=True): """Creates this sequence in the database.""" if bind is None: bind = _bind_or_error(self) bind.create(self, checkfirst=checkfirst) def drop(self, bind=None, checkfirst=True): """Drops this sequence from the database.""" if bind is None: bind = _bind_or_error(self) bind.drop(self, checkfirst=checkfirst)class Constraint(SchemaItem): """Represent a table-level ``Constraint`` such as a composite primary key, foreign key, or unique constraint. Implements a hybrid of dict/setlike behavior with regards to the list of underying columns. """ def __init__(self, name=None, deferrable=None, initially=None): self.name = name self.columns = expression.ColumnCollection() self.deferrable = deferrable self.initially = initially def __contains__(self, x): return self.columns.contains_column(x) def keys(self): return self.columns.keys() def __add__(self, other): return self.columns + other def __iter__(self): return iter(self.columns) def __len__(self): return len(self.columns) def copy(self): raise NotImplementedError()class CheckConstraint(Constraint): def __init__(self, sqltext, name=None, deferrable=None, initially=None): super(CheckConstraint, self).__init__(name, deferrable, initially) self.sqltext = sqltext def __visit_name__(self): if isinstance(self.parent, Table): return "check_constraint" else: return "column_check_constraint" __visit_name__ = property(__visit_name__) def _set_parent(self, parent): self.parent = parent parent.constraints.add(self) def copy(self): return CheckConstraint(self.sqltext, name=self.name)class ForeignKeyConstraint(Constraint): """Table-level foreign key constraint, represents a collection of ``ForeignKey`` objects.""" def __init__(self, columns, refcolumns, name=None, onupdate=None, ondelete=None, use_alter=False, deferrable=None, initially=None): super(ForeignKeyConstraint, self).__init__(name, deferrable, initially) self.__colnames = columns self.__refcolnames = refcolumns self.elements = util.OrderedSet() self.onupdate = onupdate self.ondelete = ondelete if self.name is None and use_alter: raise exceptions.ArgumentError("Alterable ForeignKey/ForeignKeyConstraint requires a name") self.use_alter = use_alter def _set_parent(self, table): self.table = table if self not in table.constraints: table.constraints.add(self) for (c, r) in zip(self.__colnames, self.__refcolnames): self.append_element(c,r) def append_element(self, col, refcol): fk = ForeignKey(refcol, constraint=self, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter) fk._set_parent(self.table.c[col]) self._append_fk(fk) def _append_fk(self, fk): self.columns.add(self.table.c[fk.parent.key]) self.elements.add(fk) def copy(self): return ForeignKeyConstraint([x.parent.name for x in self.elements], [x._get_colspec() for x in self.elements], name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter)class PrimaryKeyConstraint(Constraint): def __init__(self, *columns, **kwargs): constraint_args = dict(name=kwargs.pop('name', None), deferrable=kwargs.pop('deferrable', None), initially=kwargs.pop('initially', None)) if kwargs: raise exceptions.ArgumentError( 'Unknown PrimaryKeyConstraint argument(s): %s' % ', '.join([repr(x) for x in kwargs.keys()])) super(PrimaryKeyConstraint, self).__init__(**constraint_args) self.__colnames = list(columns) def _set_parent(self, table): self.table = table table.primary_key = self for name in self.__colnames: self.add(table.c[name]) def add(self, col): self.columns.add(col) col.primary_key=True append_column = add def replace(self, col): self.columns.replace(col) def remove(self, col): col.primary_key=False del self.columns[col.key] def copy(self): return PrimaryKeyConstraint(name=self.name, *[c.key for c in self]) def __eq__(self, other): return self.columns == otherclass UniqueConstraint(Constraint): def __init__(self, *columns, **kwargs): constraint_args = dict(name=kwargs.pop('name', None), deferrable=kwargs.pop('deferrable', None), initially=kwargs.pop('initially', None)) if kwargs: raise exceptions.ArgumentError( 'Unknown UniqueConstraint argument(s): %s' % ', '.join([repr(x) for x in kwargs.keys()])) super(UniqueConstraint, self).__init__(**constraint_args) self.__colnames = list(columns) def _set_parent(self, table): self.table = table table.constraints.add(self) for c in self.__colnames: self.append_column(table.c[c])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -