📄 store.py
字号:
database. """ cls_info = obj_info.cls_info cached_primary_vars = obj_info.get("primary_vars") primary_key_idx = cls_info.primary_key_idx missing_columns = [] for column in cls_info.columns: variable = obj_info.variables[column] if not variable.is_defined(): idx = primary_key_idx.get(id(column)) lazy_value = variable.get_lazy() if (idx is not None and cached_primary_vars is not None and lazy_value): # XXX lazy_value might not be AutoReload. Test & fix this. # For auto-reloading a primary key, just get the # value out of the cache. variable.set(cached_primary_vars[idx].get()) elif (replace_lazy or lazy_value is None or lazy_value is AutoReload): missing_columns.append(column) if missing_columns: primary_key = cls_info.primary_key for variable in primary_vars: if not variable.is_defined(): # XXX Think about the case where the primary key is set # to a lazy value which isn't AutoReload. if result is None: raise RuntimeError("Can't find missing primary values " "without a meaningful result") where = result.get_insert_identity(primary_key, primary_vars) break else: where = compare_columns(primary_key, primary_vars) # This procedure will also validate the cache. result = self._connection.execute(Select(missing_columns, where)) self._set_values(obj_info, missing_columns, result, result.get_one()) obj_info.pop("invalidated", None) if checkpoint: for column in missing_columns: obj_info.variables[column].checkpoint() elif obj_info.get("invalidated"): # In case of no explicit missing values, enforce cache validation. # It might happen that the primary key was autoreloaded and # restored from the cache. where = compare_columns(cls_info.primary_key, primary_vars) result = self._connection.execute(Select(SQLRaw("1"), where)) if not result.get_one(): raise LostObjectError("Object is not in the database anymore") obj_info.pop("invalidated", None) def _load_objects(self, cls_spec_info, result, values): if type(cls_spec_info) is not tuple: return self._load_object(cls_spec_info, result, values) else: objects = [] values_start = values_end = 0 for cls_info in cls_spec_info: values_end += len(cls_info.columns) obj = self._load_object(cls_info, result, values[values_start:values_end]) objects.append(obj) values_start = values_end return tuple(objects) def _load_object(self, cls_info, result, values): # _set_values() need the cls_info columns for the class of the # actual object, not from a possible wrapper (e.g. an alias). cls = cls_info.cls cls_info = get_cls_info(cls) # Prepare cache key. primary_vars = [] columns = cls_info.columns is_null = True for i in cls_info.primary_key_pos: value = values[i] if value is not None: is_null = False variable = columns[i].variable_factory(value=value, from_db=True) primary_vars.append(variable) if is_null: # We've got a row full of NULLs, so consider that the object # wasn't found. This is useful for joins, where unexistent # rows are reprsented like that. return None # Lookup cache. obj_info = self._cache.get((cls, tuple(primary_vars))) if obj_info is not None: # Found object in cache, and it must be valid since the # primary key was extracted from result values. obj_info.pop("invalidated", None) # We're not sure if the obj is still in memory at this point. obj = obj_info.get_obj() if obj is not None: # Great, the object is still in memory. Nothing to do. pass else: # Object died while obj_info was still in memory. # Rebuild the object and maintain the obj_info. obj = self._rebuild_deleted_object(obj_info) else: # Nothing found in the cache. Build everything from the ground. obj = cls.__new__(cls) obj_info = get_obj_info(obj) obj_info["store"] = self self._set_values(obj_info, cls_info.columns, result, values) obj_info.checkpoint() self._add_to_cache(obj_info) self._enable_change_notification(obj_info) self._enable_lazy_resolving(obj_info) self._run_hook(obj_info, "__storm_loaded__") return obj def _rebuild_deleted_object(self, obj_info): """Rebuild a deleted object and maintain the obj_info.""" cls = obj_info.cls_info.cls obj = cls.__new__(cls) obj_info.set_obj(obj) set_obj_info(obj, obj_info) self._run_hook(obj_info, "__storm_loaded__") return obj @staticmethod def _run_hook(obj_info, hook_name): func = getattr(obj_info.get_obj(), hook_name, None) if func is not None: func() def _set_values(self, obj_info, columns, result, values): if values is None: raise LostObjectError("Can't obtain values from the database " "(object got removed?)") for column, value in zip(columns, values): if value is None: obj_info.variables[column].set(value, from_db=True) else: result.set_variable(obj_info.variables[column], value) def _is_dirty(self, obj_info): return obj_info in self._dirty def _set_dirty(self, obj_info): self._dirty[obj_info] = obj_info.get_obj() def _set_clean(self, obj_info): self._dirty.pop(obj_info, None) def _iter_dirty(self): return self._dirty def _add_to_cache(self, obj_info): """Add an object to the cache, keyed on primary key variables. When an object is added to the cache, the key is built from a copy of the current variables that are part of the primary key. This means that, when an object is retrieved from the database, these values may be used to get the cached object which is already in memory, even if it requested the primary key value to be changed. For that reason, when changes to the primary key are flushed, the cache key should also be updated to reflect these changes. """ cls_info = obj_info.cls_info old_primary_vars = obj_info.get("primary_vars") if old_primary_vars is not None: self._cache.pop((cls_info.cls, old_primary_vars), None) new_primary_vars = tuple(variable.copy() for variable in obj_info.primary_vars) self._cache[cls_info.cls, new_primary_vars] = obj_info obj_info["primary_vars"] = new_primary_vars def _remove_from_cache(self, obj_info): """Remove an object from the cache. This method is only called for objects that were explicitly deleted and flushed. Objects that are unused will get removed from the cache dictionary automatically by their weakref callbacks. """ primary_vars = obj_info.get("primary_vars") if primary_vars is not None: del self._cache[obj_info.cls_info.cls, primary_vars] del obj_info["primary_vars"] def _iter_cached(self): return self._cache.values() def _enable_change_notification(self, obj_info): obj_info.event.hook("changed", self._variable_changed) def _disable_change_notification(self, obj_info): obj_info.event.unhook("changed", self._variable_changed) def _variable_changed(self, obj_info, variable, old_value, new_value, fromdb): # The fromdb check makes sure that values coming from the # database don't mark the object as dirty again. # XXX The fromdb check is untested. How to test it? if not fromdb: if new_value is not Undef and new_value is not AutoReload: if obj_info.get("invalidated"): # This might be a previously cached object being # updated. Let's validate it now to improve debugging. # This will raise LostObjectError if the object is gone. self._fill_missing_values(obj_info, obj_info["primary_vars"]) self._set_dirty(obj_info) def _enable_lazy_resolving(self, obj_info): obj_info.event.hook("resolve-lazy-value", self._resolve_lazy_value) def _disable_lazy_resolving(self, obj_info): obj_info.event.unhook("resolve-lazy-value", self._resolve_lazy_value) def _resolve_lazy_value(self, obj_info, variable, lazy_value): if lazy_value is AutoReload: cached_primary_vars = obj_info.get("primary_vars") if cached_primary_vars is None: # XXX See the comment on self.flush() below. self.flush() else: idx = obj_info.cls_info.primary_key_idx.get(id(variable.column)) if idx is not None: # No need to touch the database if auto-reloading # a primary key variable. variable.set(cached_primary_vars[idx].get()) else: self._fill_missing_values(obj_info, cached_primary_vars) else: # XXX This will do it for now, but it should really flush # just this single object and ones that it depends on. # _flush_one() doesn't consider dependencies, so it may # not be used directly. self.flush()class ResultSet(object): def __init__(self, store, cls_spec_info, where=Undef, tables=Undef, select=Undef): self._store = store self._cls_spec_info = cls_spec_info self._where = where self._tables = tables self._select = select self._order_by = getattr(cls_spec_info, "default_order", Undef) self._offset = Undef self._limit = Undef self._distinct = False def copy(self): """Return a copy of this resultset object, with the same configuration. """ result_set = object.__new__(self.__class__) result_set.__dict__.update(self.__dict__) return result_set def config(self, distinct=None, offset=None, limit=None): """Configure this result object in-place. All parameters are optional. @param distinct: Boolean enabling/disabling usage of the DISTINCT keyword in the query made. @param offset: Offset where results will start to be retrieved from the result set. @param limit: Limit the number of objects retrieved from the result set. @return: self (not a copy). """ if distinct is not None: self._distinct = distinct if offset is not None: self._offset = offset if limit is not None: self._limit = limit return self def _get_select(self): if self._select is not Undef: if self._order_by is not Undef: self._select.order_by = self._order_by if self._limit is not Undef: # XXX UNTESTED! self._select.limit = self._limit if self._offset is not Undef: # XXX UNTESTED! self._select.offset = self._offset return self._select if type(self._cls_spec_info) is tuple: columns = [] default_tables = [] for cls_info in self._cls_spec_info: columns.append(cls_info.columns) default_tables.append(cls_info.table) else: columns = self._cls_spec_info.columns default_tables = self._cls_spec_info.table return Select(columns, self._where, self._tables, default_tables, self._order_by, offset=self._offset, limit=self._limit, distinct=self._distinct) def _load_objects(self, result, values): return self._store._load_objects(self._cls_spec_info, result, values) def __iter__(self): result = self._store._connection.execute(self._get_select()) for values in result: yield self._load_objects(result, values) def __getitem__(self, index): if isinstance(index, (int, long)): if index == 0: result_set = self else: if self._offset is not Undef: index += self._offset result_set = self.copy() result_set.config(offset=index, limit=1) obj = result_set.any() if obj is None: raise IndexError("Index out of range") return obj if not isinstance(index, slice): raise IndexError("Can't index ResultSets with %r" % (index,)) if index.step is not None: raise IndexError("Stepped slices not yet supported: %r" % (index.step,)) offset = self._offset limit = self._limit if index.start is not None: if offset is Undef: offset = index.start else: offset += index.start if limit is not Undef: limit = max(0, limit - index.start) if index.stop is not None: if index.start is None: new_limit = index.stop else: new_limit = index.stop - index.start if limit is Undef or limit > new_limit: limit = new_limit return self.copy().config(offset=offset, limit=limit) def any(self): """Return a single item from the result set. See also one(), first(), and last(). """ select = self._get_select() select.limit = 1
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -