📄 properties.py
字号:
def any(self, criterion=None, **kwargs): if not self.prop.uselist: raise exceptions.InvalidRequestError("'any()' not implemented for scalar attributes. Use has().") j = self.prop.primaryjoin if self.prop.secondaryjoin: j = j & self.prop.secondaryjoin for k in kwargs: crit = (getattr(self.prop.mapper.class_, k) == kwargs[k]) if criterion is None: criterion = crit else: criterion = criterion & crit return sql.exists([1], j & criterion) def has(self, criterion=None, **kwargs): if self.prop.uselist: raise exceptions.InvalidRequestError("'has()' not implemented for collections. Use any().") j = self.prop.primaryjoin if self.prop.secondaryjoin: j = j & self.prop.secondaryjoin for k in kwargs: crit = (getattr(self.prop.mapper.class_, k) == kwargs[k]) if criterion is None: criterion = crit else: criterion = criterion & crit return sql.exists([1], j & criterion) def contains(self, other): if not self.prop.uselist: raise exceptions.InvalidRequestError("'contains' not implemented for scalar attributes. Use ==") clause = self.prop._optimized_compare(other) if self.prop.secondaryjoin: j = self.prop.primaryjoin j = j & self.prop.secondaryjoin clause.negation_clause = ~sql.exists([1], j & sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(other))])) return clause def __ne__(self, other): if self.prop.uselist and not hasattr(other, '__iter__'): raise exceptions.InvalidRequestError("Can only compare a collection to an iterable object") j = self.prop.primaryjoin if self.prop.secondaryjoin: j = j & self.prop.secondaryjoin return ~sql.exists([1], j & sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(other))])) def compare(self, op, value, value_is_parent=False): if op == operators.eq: if value is None: if self.uselist: return ~sql.exists([1], self.primaryjoin) else: return self._optimized_compare(None, value_is_parent=value_is_parent) else: return self._optimized_compare(value, value_is_parent=value_is_parent) else: return op(self.comparator, value) def _optimized_compare(self, value, value_is_parent=False): return self._get_strategy(strategies.LazyLoader).lazy_clause(value, reverse_direction=not value_is_parent) def private(self): return self.cascade.delete_orphan private = property(private) def create_strategy(self): if self.strategy_class: return self.strategy_class(self) elif self.lazy == 'dynamic': return strategies.DynaLoader(self) elif self.lazy: return strategies.LazyLoader(self) elif self.lazy is False: return strategies.EagerLoader(self) elif self.lazy is None: return strategies.NoLoader(self) def __str__(self): return str(self.parent.class_.__name__) + "." + self.key + " (" + str(self.mapper.class_.__name__) + ")" def merge(self, session, source, dest, dont_load, _recursive): if not dont_load and self._reverse_property and (source, self._reverse_property) in _recursive: return if not "merge" in self.cascade: # TODO: lazy callable should merge to the new instance dest._state.expire_attributes([self.key]) return instances = attributes.get_as_list(source._state, self.key, passive=True) if not instances: return if self.uselist: dest_list = attributes.init_collection(dest, self.key) for current in instances: _recursive[(current, self)] = True obj = session.merge(current, entity_name=self.mapper.entity_name, dont_load=dont_load, _recursive=_recursive) if obj is not None: if dont_load: dest_list.append_without_event(obj) else: dest_list.append_with_event(obj) else: current = instances[0] if current is not None: _recursive[(current, self)] = True obj = session.merge(current, entity_name=self.mapper.entity_name, dont_load=dont_load, _recursive=_recursive) if obj is not None: if dont_load: dest.__dict__[self.key] = obj else: setattr(dest, self.key, obj) def cascade_iterator(self, type, state, recursive, halt_on=None): if not type in self.cascade: return passive = type != 'delete' or self.passive_deletes mapper = self.mapper.primary_mapper() instances = attributes.get_as_list(state, self.key, passive=passive) if instances: for c in instances: if c is not None and c not in recursive and (halt_on is None or not halt_on(c)): if not isinstance(c, self.mapper.class_): raise exceptions.AssertionError("Attribute '%s' on class '%s' doesn't handle objects of type '%s'" % (self.key, str(self.parent.class_), str(c.__class__))) recursive.add(c) # cascade using the mapper local to this object, so that its individual properties are located instance_mapper = object_mapper(c, entity_name=mapper.entity_name) yield (c, instance_mapper) for (c2, m) in instance_mapper.cascade_iterator(type, c._state, recursive): yield (c2, m) def _get_target_class(self): """Return the target class of the relation, even if the property has not been initialized yet. """ if isinstance(self.argument, type): return self.argument else: return self.argument.class_ def do_init(self): self._determine_targets() self._determine_joins() self._determine_fks() self._determine_direction() self._determine_remote_side() self._create_polymorphic_joins() self._post_init() def _determine_targets(self): if isinstance(self.argument, type): self.mapper = mapper.class_mapper(self.argument, entity_name=self.entity_name, compile=False) elif isinstance(self.argument, mapper.Mapper): self.mapper = self.argument else: raise exceptions.ArgumentError("relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument))) # ensure the "select_mapper", if different from the regular target mapper, is compiled. self.mapper.get_select_mapper() if not self.parent.concrete: for inheriting in self.parent.iterate_to_root(): if inheriting is not self.parent and inheriting._get_property(self.key, raiseerr=False): util.warn( ("Warning: relation '%s' on mapper '%s' supercedes " "the same relation on inherited mapper '%s'; this " "can cause dependency issues during flush") % (self.key, self.parent, inheriting)) if self.association is not None: if isinstance(self.association, type): self.association = mapper.class_mapper(self.association, entity_name=self.entity_name, compile=False) self.target = self.mapper.mapped_table self.select_mapper = self.mapper.get_select_mapper() self.select_table = self.mapper.select_table self.loads_polymorphic = self.target is not self.select_table if self.cascade.delete_orphan: if self.parent.class_ is self.mapper.class_: raise exceptions.ArgumentError("In relationship '%s', can't establish 'delete-orphan' cascade rule on a self-referential relationship. You probably want cascade='all', which includes delete cascading but not orphan detection." %(str(self))) self.mapper.primary_mapper().delete_orphans.append((self.key, self.parent.class_)) def _determine_joins(self): if self.secondaryjoin is not None and self.secondary is None: raise exceptions.ArgumentError("Property '" + self.key + "' specified with secondary join condition but no secondary argument") # if join conditions were not specified, figure them out based on foreign keys def _search_for_join(mapper, table): """find a join between the given mapper's mapped table and the given table. will try the mapper's local table first for more specificity, then if not found will try the more general mapped table, which in the case of inheritance is a join.""" try: return sql.join(mapper.local_table, table) except exceptions.ArgumentError, e: return sql.join(mapper.mapped_table, table) try: if self.secondary is not None: if self.secondaryjoin is None: self.secondaryjoin = _search_for_join(self.mapper, self.secondary).onclause if self.primaryjoin is None: self.primaryjoin = _search_for_join(self.parent, self.secondary).onclause else: if self.primaryjoin is None: self.primaryjoin = _search_for_join(self.parent, self.target).onclause except exceptions.ArgumentError, e: raise exceptions.ArgumentError("""Error determining primary and/or secondary join for relationship '%s'. If the underlying error cannot be corrected, you should specify the 'primaryjoin' (and 'secondaryjoin', if there is an association table present) keyword arguments to the relation() function (or for backrefs, by specifying the backref using the backref() function with keyword arguments) to explicitly specify the join conditions. Nested error is \"%s\"""" % (str(self), str(e))) # if using polymorphic mapping, the join conditions must be agasint the base tables of the mappers, # as the loader strategies expect to be working with those now (they will adapt the join conditions # to the "polymorphic" selectable as needed). since this is an API change, put an explicit check/ # error message in case its the "old" way. if self.loads_polymorphic: vis = ColumnsInClause(self.mapper.select_table) vis.traverse(self.primaryjoin) if self.secondaryjoin: vis.traverse(self.secondaryjoin) if vis.result: raise exceptions.ArgumentError("In relationship '%s', primary and secondary join conditions must not include columns from the polymorphic 'select_table' argument as of SA release 0.3.4. Construct join conditions using the base tables of the related mappers." % (str(self))) def _col_is_part_of_mappings(self, column): if self.secondary is None: return self.parent.mapped_table.c.contains_column(column) or \ self.target.c.contains_column(column) else: return self.parent.mapped_table.c.contains_column(column) or \ self.target.c.contains_column(column) or \ self.secondary.c.contains_column(column) is not None def _determine_fks(self): if self._legacy_foreignkey and not self._is_self_referential(): self.foreign_keys = self._legacy_foreignkey if self.foreign_keys: self._opposite_side = util.Set() def visit_binary(binary): if binary.operator != operators.eq or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column): return if binary.left in self.foreign_keys: self._opposite_side.add(binary.right) if binary.right in self.foreign_keys: self._opposite_side.add(binary.left) visitors.traverse(self.primaryjoin, visit_binary=visit_binary) if self.secondaryjoin is not None: visitors.traverse(self.secondaryjoin, visit_binary=visit_binary) else: self.foreign_keys = util.Set() self._opposite_side = util.Set() def visit_binary(binary): if binary.operator != operators.eq or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column): return # this check is for when the user put the "view_only" flag on and has tables that have nothing # to do with the relationship's parent/child mappings in the join conditions. we dont want cols # or clauses related to those external tables dealt with. see orm.relationships.ViewOnlyTest if not self._col_is_part_of_mappings(binary.left) or not self._col_is_part_of_mappings(binary.right): return for f in binary.left.foreign_keys: if f.references(binary.right.table):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -