⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 schema.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
    def references(self, table):        """Return True if the given table is referenced by this ``ForeignKey``."""        return table.corresponding_column(self.column) is not None    def get_referent(self, table):        """return the column in the given table referenced by this ``ForeignKey``, or        None if this ``ForeignKey`` does not reference the given table.        """        return table.corresponding_column(self.column)    def column(self):        # ForeignKey inits its remote column as late as possible, so tables can        # be defined without dependencies        if self._column is None:            if isinstance(self._colspec, basestring):                # locate the parent table this foreign key is attached to.                # we use the "original" column which our parent column represents                # (its a list of columns/other ColumnElements if the parent table is a UNION)                for c in self.parent.base_columns:                    if isinstance(c, Column):                        parenttable = c.table                        break                else:                    raise exceptions.ArgumentError("Parent column '%s' does not descend from a table-attached Column" % str(self.parent))                m = re.match(r"^(.+?)(?:\.(.+?))?(?:\.(.+?))?$", self._colspec, re.UNICODE)                if m is None:                    raise exceptions.ArgumentError("Invalid foreign key column specification: " + self._colspec)                if m.group(3) is None:                    (tname, colname) = m.group(1, 2)                    schema = None                else:                    (schema,tname,colname) = m.group(1,2,3)                if _get_table_key(tname, schema) not in parenttable.metadata:                    raise exceptions.InvalidRequestError("Could not find table '%s' with which to generate a foreign key" % tname)                table = Table(tname, parenttable.metadata, mustexist=True, schema=schema)                try:                    if colname is None:                        # colname is None in the case that ForeignKey argument was specified                        # as table name only, in which case we match the column name to the same                        # column on the parent.                        key = self.parent                        self._column = table.c[self.parent.key]                    else:                        self._column = table.c[colname]                except KeyError, e:                    raise exceptions.ArgumentError("Could not create ForeignKey '%s' on table '%s': table '%s' has no column named '%s'" % (self._colspec, parenttable.name, table.name, str(e)))            else:                self._column = self._colspec        # propigate TypeEngine to parent if it didnt have one        if isinstance(self.parent.type, types.NullType):            self.parent.type = self._column.type        return self._column    column = property(column)    def _set_parent(self, column):        self.parent = column        if self.parent._pre_existing_column is not None:            # remove existing FK which matches us            for fk in self.parent._pre_existing_column.foreign_keys:                if fk._colspec == self._colspec:                    self.parent.table.foreign_keys.remove(fk)                    self.parent.table.constraints.remove(fk.constraint)        if self.constraint is None and isinstance(self.parent.table, Table):            self.constraint = ForeignKeyConstraint([],[], use_alter=self.use_alter, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, deferrable=self.deferrable, initially=self.initially)            self.parent.table.append_constraint(self.constraint)            self.constraint._append_fk(self)        self.parent.foreign_keys.add(self)        self.parent.table.foreign_keys.add(self)class DefaultGenerator(SchemaItem):    """Base class for column *default* values."""    def __init__(self, for_update=False, metadata=None):        self.for_update = for_update        self.metadata = util.assert_arg_type(metadata, (MetaData, type(None)), 'metadata')    def _set_parent(self, column):        self.column = column        self.metadata = self.column.table.metadata        if self.for_update:            self.column.onupdate = self        else:            self.column.default = self    def execute(self, bind=None, **kwargs):        if bind is None:            bind = _bind_or_error(self)        return bind._execute_default(self, **kwargs)    def __repr__(self):        return "DefaultGenerator()"class PassiveDefault(DefaultGenerator):    """A default that takes effect on the database side."""    def __init__(self, arg, **kwargs):        super(PassiveDefault, self).__init__(**kwargs)        self.arg = arg    def __repr__(self):        return "PassiveDefault(%s)" % repr(self.arg)class ColumnDefault(DefaultGenerator):    """A plain default value on a column.    This could correspond to a constant, a callable function, or a SQL    clause.    """    def __init__(self, arg, **kwargs):        super(ColumnDefault, self).__init__(**kwargs)        if callable(arg):            arg = self._maybe_wrap_callable(arg)        self.arg = arg    def _maybe_wrap_callable(self, fn):        """Backward compat: Wrap callables that don't accept a context."""        if inspect.isfunction(fn):            inspectable = fn        elif inspect.isclass(fn):            inspectable = fn.__init__        elif hasattr(fn, '__call__'):            inspectable = fn.__call__        else:            # probably not inspectable, try anyways.            inspectable = fn        try:            argspec = inspect.getargspec(inspectable)        except TypeError:            return lambda ctx: fn()        positionals = len(argspec[0])        if inspect.ismethod(inspectable):            positionals -= 1        if positionals == 0:            return lambda ctx: fn()        defaulted = argspec[3] is not None and len(argspec[3]) or 0        if positionals - defaulted > 1:            raise exceptions.ArgumentError(                "ColumnDefault Python function takes zero or one "                "positional arguments")        return fn    def _visit_name(self):        if self.for_update:            return "column_onupdate"        else:            return "column_default"    __visit_name__ = property(_visit_name)    def __repr__(self):        return "ColumnDefault(%s)" % repr(self.arg)class Sequence(DefaultGenerator):    """Represents a named sequence."""    def __init__(self, name, start=None, increment=None, schema=None,                 optional=False, quote=False, **kwargs):        super(Sequence, self).__init__(**kwargs)        self.name = name        self.start = start        self.increment = increment        self.optional=optional        self.quote = quote        self.schema = schema        self.kwargs = kwargs    def __repr__(self):        return "Sequence(%s)" % ', '.join(            [repr(self.name)] +            ["%s=%s" % (k, repr(getattr(self, k)))             for k in ['start', 'increment', 'optional']])    def _set_parent(self, column):        super(Sequence, self)._set_parent(column)        column.sequence = self    def create(self, bind=None, checkfirst=True):        """Creates this sequence in the database."""        if bind is None:            bind = _bind_or_error(self)        bind.create(self, checkfirst=checkfirst)    def drop(self, bind=None, checkfirst=True):        """Drops this sequence from the database."""        if bind is None:            bind = _bind_or_error(self)        bind.drop(self, checkfirst=checkfirst)class Constraint(SchemaItem):    """Represent a table-level ``Constraint`` such as a composite primary key, foreign key, or unique constraint.    Implements a hybrid of dict/setlike behavior with regards to the    list of underying columns.    """    def __init__(self, name=None, deferrable=None, initially=None):        self.name = name        self.columns = expression.ColumnCollection()        self.deferrable = deferrable        self.initially = initially    def __contains__(self, x):        return self.columns.contains_column(x)    def keys(self):        return self.columns.keys()    def __add__(self, other):        return self.columns + other    def __iter__(self):        return iter(self.columns)    def __len__(self):        return len(self.columns)    def copy(self):        raise NotImplementedError()class CheckConstraint(Constraint):    def __init__(self, sqltext, name=None, deferrable=None, initially=None):        super(CheckConstraint, self).__init__(name, deferrable, initially)        self.sqltext = sqltext    def __visit_name__(self):        if isinstance(self.parent, Table):            return "check_constraint"        else:            return "column_check_constraint"    __visit_name__ = property(__visit_name__)    def _set_parent(self, parent):        self.parent = parent        parent.constraints.add(self)    def copy(self):        return CheckConstraint(self.sqltext, name=self.name)class ForeignKeyConstraint(Constraint):    """Table-level foreign key constraint, represents a collection of ``ForeignKey`` objects."""    def __init__(self, columns, refcolumns, name=None, onupdate=None, ondelete=None, use_alter=False, deferrable=None, initially=None):        super(ForeignKeyConstraint, self).__init__(name, deferrable, initially)        self.__colnames = columns        self.__refcolnames = refcolumns        self.elements = util.OrderedSet()        self.onupdate = onupdate        self.ondelete = ondelete        if self.name is None and use_alter:            raise exceptions.ArgumentError("Alterable ForeignKey/ForeignKeyConstraint requires a name")        self.use_alter = use_alter    def _set_parent(self, table):        self.table = table        if self not in table.constraints:            table.constraints.add(self)            for (c, r) in zip(self.__colnames, self.__refcolnames):                self.append_element(c,r)    def append_element(self, col, refcol):        fk = ForeignKey(refcol, constraint=self, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter)        fk._set_parent(self.table.c[col])        self._append_fk(fk)    def _append_fk(self, fk):        self.columns.add(self.table.c[fk.parent.key])        self.elements.add(fk)    def copy(self):        return ForeignKeyConstraint([x.parent.name for x in self.elements], [x._get_colspec() for x in self.elements], name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter)class PrimaryKeyConstraint(Constraint):    def __init__(self, *columns, **kwargs):        constraint_args = dict(name=kwargs.pop('name', None),                               deferrable=kwargs.pop('deferrable', None),                               initially=kwargs.pop('initially', None))        if kwargs:            raise exceptions.ArgumentError(                'Unknown PrimaryKeyConstraint argument(s): %s' %                ', '.join([repr(x) for x in kwargs.keys()]))        super(PrimaryKeyConstraint, self).__init__(**constraint_args)        self.__colnames = list(columns)    def _set_parent(self, table):        self.table = table        table.primary_key = self        for name in self.__colnames:            self.add(table.c[name])    def add(self, col):        self.columns.add(col)        col.primary_key=True    append_column = add    def replace(self, col):        self.columns.replace(col)    def remove(self, col):        col.primary_key=False        del self.columns[col.key]    def copy(self):        return PrimaryKeyConstraint(name=self.name, *[c.key for c in self])    def __eq__(self, other):        return self.columns == otherclass UniqueConstraint(Constraint):    def __init__(self, *columns, **kwargs):        constraint_args = dict(name=kwargs.pop('name', None),                               deferrable=kwargs.pop('deferrable', None),                               initially=kwargs.pop('initially', None))        if kwargs:            raise exceptions.ArgumentError(                'Unknown UniqueConstraint argument(s): %s' %                ', '.join([repr(x) for x in kwargs.keys()]))        super(UniqueConstraint, self).__init__(**constraint_args)        self.__colnames = list(columns)    def _set_parent(self, table):        self.table = table        table.constraints.add(self)        for c in self.__colnames:            self.append_column(table.c[c])

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -