📄 mapper.py
字号:
class_mapper(cls) if cls.__dict__.get(clskey) is self: # FIXME: there should not be any scenarios where # a mapper compile leaves this CompileOnAttr in # place. util.warn( ("Attribute '%s' on class '%s' was not replaced during " "mapper compilation operation") % (clskey, cls.__name__)) # clean us up explicitly delattr(cls, clskey) return getattr(getattr(cls, clskey), key) def _compile_properties(self): # object attribute names mapped to MapperProperty objects self.__props = util.OrderedDict() # table columns mapped to lists of MapperProperty objects # using a list allows a single column to be defined as # populating multiple object attributes self._columntoproperty = {} # load custom properties if self._init_properties is not None: for key, prop in self._init_properties.iteritems(): self._compile_property(key, prop, False) # pull properties from the inherited mapper if any. if self.inherits is not None: for key, prop in self.inherits.__props.iteritems(): if key not in self.__props: self._adapt_inherited_property(key, prop) # create properties for each column in the mapped table, # for those columns which don't already map to a property for column in self.mapped_table.columns: if column in self._columntoproperty: continue if (self.include_properties is not None and column.key not in self.include_properties): self.__log("not including property %s" % (column.key)) continue if (self.exclude_properties is not None and column.key in self.exclude_properties): self.__log("excluding property %s" % (column.key)) continue column_key = (self.column_prefix or '') + column.key self._compile_property(column_key, column, init=False, setparent=True) def _adapt_inherited_property(self, key, prop): if not self.concrete: self._compile_property(key, prop, init=False, setparent=False) # TODO: concrete properties dont adapt at all right now....will require copies of relations() etc. def _compile_property(self, key, prop, init=True, setparent=True): self.__log("_compile_property(%s, %s)" % (key, prop.__class__.__name__)) if not isinstance(prop, MapperProperty): # we were passed a Column or a list of Columns; generate a ColumnProperty columns = util.to_list(prop) column = columns[0] if not expression.is_column(column): raise exceptions.ArgumentError("%s=%r is not an instance of MapperProperty or Column" % (key, prop)) prop = self.__props.get(key, None) if isinstance(prop, ColumnProperty): # TODO: the "property already exists" case is still not well defined here. # assuming single-column, etc. if prop.parent is not self: # existing ColumnProperty from an inheriting mapper. # make a copy and append our column to it prop = prop.copy() prop.columns.append(column) self.__log("appending to existing ColumnProperty %s" % (key)) elif prop is None: mapped_column = [] for c in columns: mc = self.mapped_table.corresponding_column(c) if not mc: raise exceptions.ArgumentError("Column '%s' is not represented in mapper's table. Use the `column_property()` function to force this column to be mapped as a read-only attribute." % str(c)) mapped_column.append(mc) prop = ColumnProperty(*mapped_column) else: if not self.allow_column_override: raise exceptions.ArgumentError("WARNING: column '%s' not being added due to property '%s'. Specify 'allow_column_override=True' to mapper() to ignore this condition." % (column.key, repr(prop))) else: return if isinstance(prop, ColumnProperty): col = self.mapped_table.corresponding_column(prop.columns[0]) # col might not be present! the selectable given to the mapper need not include "deferred" # columns (included in zblog tests) if col is None: col = prop.columns[0] self.columns[key] = col for col in prop.columns: for col in col.proxy_set: self._columntoproperty[col] = prop elif isinstance(prop, SynonymProperty): prop.instrument = getattr(self.class_, key, None) if isinstance(prop.instrument, Mapper._CompileOnAttr): prop.instrument = object.__getattribute__(prop.instrument, 'existing_prop') if prop.map_column: if not key in self.mapped_table.c: raise exceptions.ArgumentError("Can't compile synonym '%s': no column on table '%s' named '%s'" % (prop.name, self.mapped_table.description, key)) self._compile_property(prop.name, ColumnProperty(self.mapped_table.c[key]), init=init, setparent=setparent) self.__props[key] = prop if setparent: prop.set_parent(self) if not self.non_primary: setattr(self.class_, key, Mapper._CompileOnAttr(self.class_, key)) if init: prop.init(key, self) for mapper in self._inheriting_mappers: mapper._adapt_inherited_property(key, prop) def _compile_selectable(self): """If the 'select_table' keyword argument was specified, set up a second *surrogate mapper* that will be used for select operations. The columns of `select_table` should encompass all the columns of the `mapped_table` either directly or through proxying relationships. Currently, non-column properties are **not** copied. This implies that a polymorphic mapper can't do any eager loading right now. """ if self.select_table is not self.mapped_table: # turn a straight join into an aliased selectable if isinstance(self.select_table, sql.Join): self.select_table = self.select_table.select(use_labels=True).alias() self.__surrogate_mapper = Mapper(self.class_, self.select_table, non_primary=True, _polymorphic_map=self.polymorphic_map, polymorphic_on=_corresponding_column_or_error(self.select_table, self.polymorphic_on), primary_key=self.primary_key_argument) adapter = sqlutil.ClauseAdapter(self.select_table, equivalents=self.__surrogate_mapper._equivalent_columns) if self.order_by: order_by = [expression._literal_as_text(o) for o in util.to_list(self.order_by) or []] order_by = adapter.copy_and_process(order_by) self.__surrogate_mapper.order_by=order_by if self._init_properties is not None: for key, prop in self._init_properties.iteritems(): if expression.is_column(prop): self.__surrogate_mapper.add_property(key, _corresponding_column_or_error(self.select_table, prop)) elif (isinstance(prop, list) and expression.is_column(prop[0])): self.__surrogate_mapper.add_property(key, [_corresponding_column_or_error(self.select_table, c) for c in prop]) self.__surrogate_mapper._clause_adapter = adapter def _compile_class(self): """If this mapper is to be a primary mapper (i.e. the non_primary flag is not set), associate this Mapper with the given class_ and entity name. Subsequent calls to ``class_mapper()`` for the class_/entity name combination will return this mapper. Also decorate the `__init__` method on the mapped class to include optional auto-session attachment logic. """ if self.non_primary: self._class_state = self.class_._class_state _mapper_registry[self] = True return if not self.non_primary and '_class_state' in self.class_.__dict__ and (self.entity_name in self.class_._class_state.mappers): raise exceptions.ArgumentError("Class '%s' already has a primary mapper defined with entity name '%s'. Use non_primary=True to create a non primary Mapper. clear_mappers() will remove *all* current mappers from all classes." % (self.class_, self.entity_name)) def extra_init(class_, oldinit, instance, args, kwargs): self.compile() if 'init_instance' in self.extension.methods: self.extension.init_instance(self, class_, oldinit, instance, args, kwargs) def on_exception(class_, oldinit, instance, args, kwargs): util.warn_exception(self.extension.init_failed, self, class_, oldinit, instance, args, kwargs) attributes.register_class(self.class_, extra_init=extra_init, on_exception=on_exception, deferred_scalar_loader=_load_scalar_attributes) self._class_state = self.class_._class_state _mapper_registry[self] = True self.class_._class_state.mappers[self.entity_name] = self for ext in util.to_list(self.extension, []): ext.instrument_class(self, self.class_) if self.entity_name is None: self.class_.c = self.c def common_parent(self, other): """Return true if the given mapper shares a common inherited parent as this mapper.""" return self.base_mapper is other.base_mapper def isa(self, other): """Return True if the given mapper inherits from this mapper.""" m = other while m is not self and m.inherits is not None: m = m.inherits return m is self def iterate_to_root(self): m = self while m is not None: yield m m = m.inherits def polymorphic_iterator(self): """Iterate through the collection including this mapper and all descendant mappers. This includes not just the immediately inheriting mappers but all their inheriting mappers as well. To iterate through an entire hierarchy, use ``mapper.base_mapper.polymorphic_iterator()``.""" yield self for mapper in self._inheriting_mappers: for m in mapper.polymorphic_iterator(): yield m def add_properties(self, dict_of_properties): """Add the given dictionary of properties to this mapper, using `add_property`. """ for key, value in dict_of_properties.iteritems(): self.add_property(key, value) def add_property(self, key, prop): """Add an individual MapperProperty to this mapper. If the mapper has not been compiled yet, just adds the property to the initial properties dictionary sent to the constructor. If this Mapper has already been compiled, then the given MapperProperty is compiled immediately. """ self._init_properties[key] = prop self._compile_property(key, prop, init=self.__props_init) def __str__(self): return "Mapper|" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "") def primary_mapper(self): """Return the primary mapper corresponding to this mapper's class key (class + entity_name).""" return self._class_state.mappers[self.entity_name] def get_session(self): """Return the contextual session provided by the mapper extension chain, if any. Raise ``InvalidRequestError`` if a session cannot be retrieved from the extension chain. """ if 'get_session' in self.extension.methods: s = self.extension.get_session() if s is not EXT_CONTINUE: return s
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -