⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 properties.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
        def any(self, criterion=None, **kwargs):            if not self.prop.uselist:                raise exceptions.InvalidRequestError("'any()' not implemented for scalar attributes. Use has().")            j = self.prop.primaryjoin            if self.prop.secondaryjoin:                j = j & self.prop.secondaryjoin            for k in kwargs:                crit = (getattr(self.prop.mapper.class_, k) == kwargs[k])                if criterion is None:                    criterion = crit                else:                    criterion = criterion & crit            return sql.exists([1], j & criterion)        def has(self, criterion=None, **kwargs):            if self.prop.uselist:                raise exceptions.InvalidRequestError("'has()' not implemented for collections.  Use any().")            j = self.prop.primaryjoin            if self.prop.secondaryjoin:                j = j & self.prop.secondaryjoin            for k in kwargs:                crit = (getattr(self.prop.mapper.class_, k) == kwargs[k])                if criterion is None:                    criterion = crit                else:                    criterion = criterion & crit            return sql.exists([1], j & criterion)        def contains(self, other):            if not self.prop.uselist:                raise exceptions.InvalidRequestError("'contains' not implemented for scalar attributes.  Use ==")            clause = self.prop._optimized_compare(other)            if self.prop.secondaryjoin:                j = self.prop.primaryjoin                j = j & self.prop.secondaryjoin                clause.negation_clause = ~sql.exists([1], j & sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(other))]))            return clause        def __ne__(self, other):            if self.prop.uselist and not hasattr(other, '__iter__'):                raise exceptions.InvalidRequestError("Can only compare a collection to an iterable object")            j = self.prop.primaryjoin            if self.prop.secondaryjoin:                j = j & self.prop.secondaryjoin            return ~sql.exists([1], j & sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(other))]))    def compare(self, op, value, value_is_parent=False):        if op == operators.eq:            if value is None:                if self.uselist:                    return ~sql.exists([1], self.primaryjoin)                else:                    return self._optimized_compare(None, value_is_parent=value_is_parent)            else:                return self._optimized_compare(value, value_is_parent=value_is_parent)        else:            return op(self.comparator, value)    def _optimized_compare(self, value, value_is_parent=False):        return self._get_strategy(strategies.LazyLoader).lazy_clause(value, reverse_direction=not value_is_parent)    def private(self):        return self.cascade.delete_orphan    private = property(private)    def create_strategy(self):        if self.strategy_class:            return self.strategy_class(self)        elif self.lazy == 'dynamic':            return strategies.DynaLoader(self)        elif self.lazy:            return strategies.LazyLoader(self)        elif self.lazy is False:            return strategies.EagerLoader(self)        elif self.lazy is None:            return strategies.NoLoader(self)    def __str__(self):        return str(self.parent.class_.__name__) + "." + self.key + " (" + str(self.mapper.class_.__name__)  + ")"    def merge(self, session, source, dest, dont_load, _recursive):        if not dont_load and self._reverse_property and (source, self._reverse_property) in _recursive:            return                    if not "merge" in self.cascade:            # TODO: lazy callable should merge to the new instance            dest._state.expire_attributes([self.key])            return        instances = attributes.get_as_list(source._state, self.key, passive=True)        if not instances:            return                if self.uselist:            dest_list = attributes.init_collection(dest, self.key)            for current in instances:                _recursive[(current, self)] = True                obj = session.merge(current, entity_name=self.mapper.entity_name, dont_load=dont_load, _recursive=_recursive)                if obj is not None:                    if dont_load:                        dest_list.append_without_event(obj)                    else:                        dest_list.append_with_event(obj)        else:            current = instances[0]            if current is not None:                _recursive[(current, self)] = True                obj = session.merge(current, entity_name=self.mapper.entity_name, dont_load=dont_load, _recursive=_recursive)                if obj is not None:                    if dont_load:                        dest.__dict__[self.key] = obj                    else:                        setattr(dest, self.key, obj)    def cascade_iterator(self, type, state, recursive, halt_on=None):        if not type in self.cascade:            return        passive = type != 'delete' or self.passive_deletes        mapper = self.mapper.primary_mapper()        instances = attributes.get_as_list(state, self.key, passive=passive)        if instances:            for c in instances:                if c is not None and c not in recursive and (halt_on is None or not halt_on(c)):                    if not isinstance(c, self.mapper.class_):                        raise exceptions.AssertionError("Attribute '%s' on class '%s' doesn't handle objects of type '%s'" % (self.key, str(self.parent.class_), str(c.__class__)))                    recursive.add(c)                    # cascade using the mapper local to this object, so that its individual properties are located                    instance_mapper = object_mapper(c, entity_name=mapper.entity_name)                    yield (c, instance_mapper)                    for (c2, m) in instance_mapper.cascade_iterator(type, c._state, recursive):                        yield (c2, m)    def _get_target_class(self):        """Return the target class of the relation, even if the        property has not been initialized yet.        """        if isinstance(self.argument, type):            return self.argument        else:            return self.argument.class_    def do_init(self):        self._determine_targets()        self._determine_joins()        self._determine_fks()        self._determine_direction()        self._determine_remote_side()        self._create_polymorphic_joins()        self._post_init()    def _determine_targets(self):        if isinstance(self.argument, type):            self.mapper = mapper.class_mapper(self.argument, entity_name=self.entity_name, compile=False)        elif isinstance(self.argument, mapper.Mapper):            self.mapper = self.argument        else:            raise exceptions.ArgumentError("relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument)))        # ensure the "select_mapper", if different from the regular target mapper, is compiled.        self.mapper.get_select_mapper()        if not self.parent.concrete:            for inheriting in self.parent.iterate_to_root():                if inheriting is not self.parent and inheriting._get_property(self.key, raiseerr=False):                    util.warn(                        ("Warning: relation '%s' on mapper '%s' supercedes "                         "the same relation on inherited mapper '%s'; this "                         "can cause dependency issues during flush") %                        (self.key, self.parent, inheriting))        if self.association is not None:            if isinstance(self.association, type):                self.association = mapper.class_mapper(self.association, entity_name=self.entity_name, compile=False)        self.target = self.mapper.mapped_table        self.select_mapper = self.mapper.get_select_mapper()        self.select_table = self.mapper.select_table        self.loads_polymorphic = self.target is not self.select_table        if self.cascade.delete_orphan:            if self.parent.class_ is self.mapper.class_:                raise exceptions.ArgumentError("In relationship '%s', can't establish 'delete-orphan' cascade rule on a self-referential relationship.  You probably want cascade='all', which includes delete cascading but not orphan detection." %(str(self)))            self.mapper.primary_mapper().delete_orphans.append((self.key, self.parent.class_))    def _determine_joins(self):        if self.secondaryjoin is not None and self.secondary is None:            raise exceptions.ArgumentError("Property '" + self.key + "' specified with secondary join condition but no secondary argument")        # if join conditions were not specified, figure them out based on foreign keys        def _search_for_join(mapper, table):            """find a join between the given mapper's mapped table and the given table.            will try the mapper's local table first for more specificity, then if not            found will try the more general mapped table, which in the case of inheritance            is a join."""            try:                return sql.join(mapper.local_table, table)            except exceptions.ArgumentError, e:                return sql.join(mapper.mapped_table, table)        try:            if self.secondary is not None:                if self.secondaryjoin is None:                    self.secondaryjoin = _search_for_join(self.mapper, self.secondary).onclause                if self.primaryjoin is None:                    self.primaryjoin = _search_for_join(self.parent, self.secondary).onclause            else:                if self.primaryjoin is None:                    self.primaryjoin = _search_for_join(self.parent, self.target).onclause        except exceptions.ArgumentError, e:            raise exceptions.ArgumentError("""Error determining primary and/or secondary join for relationship '%s'. If the underlying error cannot be corrected, you should specify the 'primaryjoin' (and 'secondaryjoin', if there is an association table present) keyword arguments to the relation() function (or for backrefs, by specifying the backref using the backref() function with keyword arguments) to explicitly specify the join conditions. Nested error is \"%s\"""" % (str(self), str(e)))        # if using polymorphic mapping, the join conditions must be agasint the base tables of the mappers,        # as the loader strategies expect to be working with those now (they will adapt the join conditions        # to the "polymorphic" selectable as needed).  since this is an API change, put an explicit check/        # error message in case its the "old" way.        if self.loads_polymorphic:            vis = ColumnsInClause(self.mapper.select_table)            vis.traverse(self.primaryjoin)            if self.secondaryjoin:                vis.traverse(self.secondaryjoin)            if vis.result:                raise exceptions.ArgumentError("In relationship '%s', primary and secondary join conditions must not include columns from the polymorphic 'select_table' argument as of SA release 0.3.4.  Construct join conditions using the base tables of the related mappers." % (str(self)))    def _col_is_part_of_mappings(self, column):        if self.secondary is None:            return self.parent.mapped_table.c.contains_column(column) or \                self.target.c.contains_column(column)        else:            return self.parent.mapped_table.c.contains_column(column) or \                self.target.c.contains_column(column) or \                self.secondary.c.contains_column(column) is not None            def _determine_fks(self):        if self._legacy_foreignkey and not self._is_self_referential():            self.foreign_keys = self._legacy_foreignkey        if self.foreign_keys:            self._opposite_side = util.Set()            def visit_binary(binary):                if binary.operator != operators.eq or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column):                    return                if binary.left in self.foreign_keys:                    self._opposite_side.add(binary.right)                if binary.right in self.foreign_keys:                    self._opposite_side.add(binary.left)            visitors.traverse(self.primaryjoin, visit_binary=visit_binary)            if self.secondaryjoin is not None:                visitors.traverse(self.secondaryjoin, visit_binary=visit_binary)        else:            self.foreign_keys = util.Set()            self._opposite_side = util.Set()            def visit_binary(binary):                if binary.operator != operators.eq or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column):                    return                # this check is for when the user put the "view_only" flag on and has tables that have nothing                # to do with the relationship's parent/child mappings in the join conditions.  we dont want cols                # or clauses related to those external tables dealt with.  see orm.relationships.ViewOnlyTest                if not self._col_is_part_of_mappings(binary.left) or not self._col_is_part_of_mappings(binary.right):                    return                for f in binary.left.foreign_keys:                    if f.references(binary.right.table):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -