📄 base.py
字号:
key = key.lower() try: rec = props[key] except KeyError: # fallback for targeting a ColumnElement to a textual expression if isinstance(key, expression.ColumnElement): if key._label.lower() in props: return props[key._label.lower()] elif key.name.lower() in props: return props[key.name.lower()] raise exceptions.NoSuchColumnError("Could not locate column in row for column '%s'" % (str(key))) return rec return util.PopulateDict(lookup_key) def __ambiguous_processor(self, colname): def process(value): raise exceptions.InvalidRequestError("Ambiguous column name '%s' in result set! try 'use_labels' option on select statement." % colname) return process def close(self): """Close this ResultProxy, and the underlying DB-API cursor corresponding to the execution. If this ResultProxy was generated from an implicit execution, the underlying Connection will also be closed (returns the underlying DB-API connection to the connection pool.) This method is also called automatically when all result rows are exhausted. """ if not self.closed: self.closed = True self.cursor.close() if self.connection.should_close_with_result: self.connection.close() def keys(self): return self.__keys keys = property(keys) def _has_key(self, row, key): try: # _key_cache uses __missing__ in 2.5, so not much alternative # to catching KeyError self._key_cache[key] return True except KeyError: return False def __iter__(self): while True: row = self.fetchone() if row is None: raise StopIteration else: yield row def last_inserted_ids(self): """Return ``last_inserted_ids()`` from the underlying ExecutionContext. See ExecutionContext for details. """ return self.context.last_inserted_ids() def last_updated_params(self): """Return ``last_updated_params()`` from the underlying ExecutionContext. See ExecutionContext for details. """ return self.context.last_updated_params() def last_inserted_params(self): """Return ``last_inserted_params()`` from the underlying ExecutionContext. See ExecutionContext for details. """ return self.context.last_inserted_params() def lastrow_has_defaults(self): """Return ``lastrow_has_defaults()`` from the underlying ExecutionContext. See ExecutionContext for details. """ return self.context.lastrow_has_defaults() def postfetch_cols(self): """Return ``postfetch_cols()`` from the underlying ExecutionContext. See ExecutionContext for details. """ return self.context.postfetch_cols def supports_sane_rowcount(self): """Return ``supports_sane_rowcount`` from the dialect. """ return self.dialect.supports_sane_rowcount def supports_sane_multi_rowcount(self): """Return ``supports_sane_multi_rowcount`` from the dialect. """ return self.dialect.supports_sane_multi_rowcount def _get_col(self, row, key): try: type_, processor, index = self._key_cache[key] except TypeError: # the 'slice' use case is very infrequent, # so we use an exception catch to reduce conditionals in _get_col if isinstance(key, slice): indices = key.indices(len(row)) return tuple([self._get_col(row, i) for i in xrange(*indices)]) else: raise if processor: return processor(row[index]) else: return row[index] def _fetchone_impl(self): return self.cursor.fetchone() def _fetchmany_impl(self, size=None): return self.cursor.fetchmany(size) def _fetchall_impl(self): return self.cursor.fetchall() def fetchall(self): """Fetch all rows, just like DB-API ``cursor.fetchall()``.""" l = [self._process_row(self, row) for row in self._fetchall_impl()] self.close() return l def fetchmany(self, size=None): """Fetch many rows, just like DB-API ``cursor.fetchmany(size=cursor.arraysize)``.""" l = [self._process_row(self, row) for row in self._fetchmany_impl(size)] if len(l) == 0: self.close() return l def fetchone(self): """Fetch one row, just like DB-API ``cursor.fetchone()``.""" row = self._fetchone_impl() if row is not None: return self._process_row(self, row) else: self.close() return None def scalar(self): """Fetch the first column of the first row, and close the result set.""" row = self._fetchone_impl() try: if row is not None: return self._process_row(self, row)[0] else: return None finally: self.close()class BufferedRowResultProxy(ResultProxy): """A ResultProxy with row buffering behavior. ``ResultProxy`` that buffers the contents of a selection of rows before ``fetchone()`` is called. This is to allow the results of ``cursor.description`` to be available immediately, when interfacing with a DB-API that requires rows to be consumed before this information is available (currently psycopg2, when used with server-side cursors). The pre-fetching behavior fetches only one row initially, and then grows its buffer size by a fixed amount with each successive need for additional rows up to a size of 100. """ def _init_metadata(self): self.__buffer_rows() super(BufferedRowResultProxy, self)._init_metadata() # this is a "growth chart" for the buffering of rows. # each successive __buffer_rows call will use the next # value in the list for the buffer size until the max # is reached size_growth = { 1 : 5, 5 : 10, 10 : 20, 20 : 50, 50 : 100 } def __buffer_rows(self): size = getattr(self, '_bufsize', 1) self.__rowbuffer = self.cursor.fetchmany(size) #self.context.engine.logger.debug("Buffered %d rows" % size) self._bufsize = self.size_growth.get(size, size) def _fetchone_impl(self): if self.closed: return None if len(self.__rowbuffer) == 0: self.__buffer_rows() if len(self.__rowbuffer) == 0: return None return self.__rowbuffer.pop(0) def _fetchmany_impl(self, size=None): result = [] for x in range(0, size): row = self._fetchone_impl() if row is None: break result.append(row) return result def _fetchall_impl(self): return self.__rowbuffer + list(self.cursor.fetchall())class BufferedColumnResultProxy(ResultProxy): """A ResultProxy with column buffering behavior. ``ResultProxy`` that loads all columns into memory each time fetchone() is called. If fetchmany() or fetchall() are called, the full grid of results is fetched. This is to operate with databases where result rows contain "live" results that fall out of scope unless explicitly fetched. Currently this includes just cx_Oracle LOB objects, but this behavior is known to exist in other DB-APIs as well (Pygresql, currently unsupported). """ _process_row = BufferedColumnRow def _get_col(self, row, key): try: rec = self._key_cache[key] return row[rec[2]] except TypeError: # the 'slice' use case is very infrequent, # so we use an exception catch to reduce conditionals in _get_col if isinstance(key, slice): indices = key.indices(len(row)) return tuple([self._get_col(row, i) for i in xrange(*indices)]) else: raise def fetchall(self): l = [] while True: row = self.fetchone() if row is None: break l.append(row) return l def fetchmany(self, size=None): if size is None: return self.fetchall() l = [] for i in xrange(size): row = self.fetchone() if row is None: break l.append(row) return lclass SchemaIterator(schema.SchemaVisitor): """A visitor that can gather text into a buffer and execute the contents of the buffer.""" def __init__(self, connection): """Construct a new SchemaIterator. """ self.connection = connection self.buffer = StringIO.StringIO() def append(self, s): """Append content to the SchemaIterator's query buffer.""" self.buffer.write(s) def execute(self): """Execute the contents of the SchemaIterator's buffer.""" try: return self.connection.execute(self.buffer.getvalue()) finally: self.buffer.truncate(0)class DefaultRunner(schema.SchemaVisitor): """A visitor which accepts ColumnDefault objects, produces the dialect-specific SQL corresponding to their execution, and executes the SQL, returning the result value. DefaultRunners are used internally by Engines and Dialects. Specific database modules should provide their own subclasses of DefaultRunner to allow database-specific behavior. """ def __init__(self, context): self.context = context self.dialect = context.dialect def get_column_default(self, column): if column.default is not None: return self.traverse_single(column.default) else: return None def get_column_onupdate(self, column): if column.onupdate is not None: return self.traverse_single(column.onupdate) else: return None def visit_passive_default(self, default): """Do nothing. Passive defaults by definition return None on the app side, and are post-fetched to get the DB-side value. """ return None def visit_sequence(self, seq): """Do nothing. """ return None def exec_default_sql(self, default): conn = self.context.connection c = expression.select([default.arg]).compile(bind=conn) return conn._execute_compiled(c).scalar() def execute_string(self, stmt, params=None): """execute a string statement, using the raw cursor, and return a scalar result.""" conn = self.context._connection if isinstance(stmt, unicode) and not self.dialect.supports_unicode_statements: stmt = stmt.encode(self.dialect.encoding) conn._cursor_execute(self.context.cursor, stmt, params) return self.context.cursor.fetchone()[0] def visit_column_onupdate(self, onupdate): if isinstance(onupdate.arg, expression.ClauseElement): return self.exec_default_sql(onupdate) elif callable(onupdate.arg): return onupdate.arg(self.context) else: return onupdate.arg def visit_column_default(self, default): if isinstance(default.arg, expression.ClauseElement): return self.exec_default_sql(default) elif callable(default.arg): return default.arg(self.context) else: return default.arg
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -