📄 base.py
字号:
Connects a Pool, a Dialect and a CompilerFactory together to provide a default implementation of SchemaEngine. """ def __init__(self, pool, dialect, url, echo=None): self.pool = pool self.url = url self.dialect=dialect self.echo = echo self.engine = self self.logger = logging.instance_logger(self, echoflag=echo) def name(self): "String name of the [sqlalchemy.engine#Dialect] in use by this ``Engine``." return sys.modules[self.dialect.__module__].descriptor()['name'] name = property(name) echo = logging.echo_property() def __repr__(self): return 'Engine(%s)' % str(self.url) def dispose(self): self.pool.dispose() self.pool = self.pool.recreate() def create(self, entity, connection=None, **kwargs): """Create a table or index within this engine's database connection given a schema.Table object.""" self._run_visitor(self.dialect.schemagenerator, entity, connection=connection, **kwargs) def drop(self, entity, connection=None, **kwargs): """Drop a table or index within this engine's database connection given a schema.Table object.""" self._run_visitor(self.dialect.schemadropper, entity, connection=connection, **kwargs) def _execute_default(self, default): connection = self.contextual_connect() try: return connection._execute_default(default) finally: connection.close() def func(self): return expression._FunctionGenerator(bind=self) func = property(func) def text(self, text, *args, **kwargs): """Return a sql.text() object for performing literal queries.""" return expression.text(text, bind=self, *args, **kwargs) def _run_visitor(self, visitorcallable, element, connection=None, **kwargs): if connection is None: conn = self.contextual_connect(close_with_result=False) else: conn = connection try: visitorcallable(self.dialect, conn, **kwargs).traverse(element) finally: if connection is None: conn.close() def transaction(self, callable_, connection=None, *args, **kwargs): """Execute the given function within a transaction boundary. This is a shortcut for explicitly calling `begin()` and `commit()` and optionally `rollback()` when exceptions are raised. The given `*args` and `**kwargs` will be passed to the function, as well as the Connection used in the transaction. """ if connection is None: conn = self.contextual_connect() else: conn = connection try: trans = conn.begin() try: ret = callable_(conn, *args, **kwargs) trans.commit() return ret except: trans.rollback() raise finally: if connection is None: conn.close() def run_callable(self, callable_, connection=None, *args, **kwargs): if connection is None: conn = self.contextual_connect() else: conn = connection try: return callable_(conn, *args, **kwargs) finally: if connection is None: conn.close() def execute(self, statement, *multiparams, **params): connection = self.contextual_connect(close_with_result=True) return connection.execute(statement, *multiparams, **params) def scalar(self, statement, *multiparams, **params): return self.execute(statement, *multiparams, **params).scalar() def execute_clauseelement(self, elem, multiparams=None, params=None): connection = self.contextual_connect(close_with_result=True) return connection.execute_clauseelement(elem, multiparams, params) def _execute_compiled(self, compiled, multiparams, params): connection = self.contextual_connect(close_with_result=True) return connection._execute_compiled(compiled, multiparams, params) def statement_compiler(self, statement, **kwargs): return self.dialect.statement_compiler(self.dialect, statement, bind=self, **kwargs) def connect(self, **kwargs): """Return a newly allocated Connection object.""" return Connection(self, **kwargs) def contextual_connect(self, close_with_result=False, **kwargs): """Return a Connection object which may be newly allocated, or may be part of some ongoing context. This Connection is meant to be used by the various "auto-connecting" operations. """ return Connection(self, self.pool.connect(), close_with_result=close_with_result, **kwargs) def table_names(self, schema=None, connection=None): """Return a list of all table names available in the database. schema: Optional, retrieve names from a non-default schema. connection: Optional, use a specified connection. Default is the ``contextual_connect`` for this ``Engine``. """ if connection is None: conn = self.contextual_connect() else: conn = connection if not schema: try: schema = self.dialect.get_default_schema_name(conn) except NotImplementedError: pass try: return self.dialect.table_names(conn, schema) finally: if connection is None: conn.close() def reflecttable(self, table, connection=None, include_columns=None): """Given a Table object, reflects its columns and properties from the database.""" if connection is None: conn = self.contextual_connect() else: conn = connection try: self.dialect.reflecttable(conn, table, include_columns) finally: if connection is None: conn.close() def has_table(self, table_name, schema=None): return self.run_callable(lambda c: self.dialect.has_table(c, table_name, schema=schema)) def raw_connection(self): """Return a DB-API connection.""" return self.pool.unique_connection()class RowProxy(object): """Proxy a single cursor row for a parent ResultProxy. Mostly follows "ordered dictionary" behavior, mapping result values to the string-based column name, the integer position of the result in the row, as well as Column instances which can be mapped to the original Columns that produced this result set (for results that correspond to constructed SQL expressions). """ def __init__(self, parent, row): """RowProxy objects are constructed by ResultProxy objects.""" self.__parent = parent self.__row = row if self.__parent._ResultProxy__echo: self.__parent.context.engine.logger.debug("Row " + repr(row)) def close(self): """Close the parent ResultProxy.""" self.__parent.close() def __contains__(self, key): return self.__parent._has_key(self.__row, key) def __len__(self): return len(self.__row) def __iter__(self): for i in xrange(len(self.__row)): yield self.__parent._get_col(self.__row, i) def __eq__(self, other): return ((other is self) or (other == tuple([self.__parent._get_col(self.__row, key) for key in xrange(len(self.__row))]))) def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return repr(tuple(self)) def has_key(self, key): """Return True if this RowProxy contains the given key.""" return self.__parent._has_key(self.__row, key) def __getitem__(self, key): return self.__parent._get_col(self.__row, key) def __getattr__(self, name): try: return self.__parent._get_col(self.__row, name) except KeyError, e: raise AttributeError(e.args[0]) def items(self): """Return a list of tuples, each tuple containing a key/value pair.""" return [(key, getattr(self, key)) for key in self.keys()] def keys(self): """Return the list of keys as strings represented by this RowProxy.""" return self.__parent.keys def values(self): """Return the values represented by this RowProxy as a list.""" return list(self)class BufferedColumnRow(RowProxy): def __init__(self, parent, row): row = [ResultProxy._get_col(parent, row, i) for i in xrange(len(row))] super(BufferedColumnRow, self).__init__(parent, row)class ResultProxy(object): """Wraps a DB-API cursor object to provide easier access to row columns. Individual columns may be accessed by their integer position, case-insensitive column name, or by ``schema.Column`` object. e.g.:: row = fetchone() col1 = row[0] # access via integer position col2 = row['col2'] # access via name col3 = row[mytable.c.mycol] # access via Column object. ResultProxy also contains a map of TypeEngine objects and will invoke the appropriate ``result_processor()`` method before returning columns, as well as the ExecutionContext corresponding to the statement execution. It provides several methods for which to obtain information from the underlying ExecutionContext. """ _process_row = RowProxy def __init__(self, context): """ResultProxy objects are constructed via the execute() method on SQLEngine.""" self.context = context self.dialect = context.dialect self.closed = False self.cursor = context.cursor self.connection = context.root_connection self.__echo = context.engine._should_log_info if context.returns_rows: self._init_metadata() self._rowcount = None else: self._rowcount = context.get_rowcount() self.close() def rowcount(self): if self._rowcount is not None: return self._rowcount else: return self.context.get_rowcount() rowcount = property(rowcount) def lastrowid(self): return self.cursor.lastrowid lastrowid = property(lastrowid) def out_parameters(self): return self.context.out_parameters out_parameters = property(out_parameters) def _init_metadata(self): self.__props = {} self._key_cache = self._create_key_cache() self.__keys = [] metadata = self.cursor.description if metadata is not None: typemap = self.dialect.dbapi_type_map for i, item in enumerate(metadata): colname = item[0].decode(self.dialect.encoding) if '.' in colname: # sqlite will in some circumstances prepend table name to colnames, so strip origname = colname colname = colname.split('.')[-1] else: origname = None if self.context.result_map: try: (name, obj, type_) = self.context.result_map[colname.lower()] except KeyError: (name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE)) else: (name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE)) rec = (type_, type_.dialect_impl(self.dialect).result_processor(self.dialect), i) if self.__props.setdefault(name.lower(), rec) is not rec: self.__props[name.lower()] = (type_, self.__ambiguous_processor(name), 0) # store the "origname" if we truncated (sqlite only) if origname: if self.__props.setdefault(origname.lower(), rec) is not rec: self.__props[origname.lower()] = (type_, self.__ambiguous_processor(origname), 0) self.__keys.append(colname) self.__props[i] = rec if obj: for o in obj: self.__props[o] = rec if self.__echo: self.context.engine.logger.debug("Col " + repr(tuple([x[0] for x in metadata]))) def _create_key_cache(self): # local copies to avoid circular ref against 'self' props = self.__props context = self.context def lookup_key(key): """Given a key, which could be a ColumnElement, string, etc., matches it to the appropriate key we got from the result set's metadata; then cache it locally for quick re-access.""" if isinstance(key, basestring):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -