⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 base.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
    Connects a Pool, a Dialect and a CompilerFactory together to    provide a default implementation of SchemaEngine.    """    def __init__(self, pool, dialect, url, echo=None):        self.pool = pool        self.url = url        self.dialect=dialect        self.echo = echo        self.engine = self        self.logger = logging.instance_logger(self, echoflag=echo)    def name(self):        "String name of the [sqlalchemy.engine#Dialect] in use by this ``Engine``."        return sys.modules[self.dialect.__module__].descriptor()['name']    name = property(name)    echo = logging.echo_property()    def __repr__(self):        return 'Engine(%s)' % str(self.url)    def dispose(self):        self.pool.dispose()        self.pool = self.pool.recreate()    def create(self, entity, connection=None, **kwargs):        """Create a table or index within this engine's database connection given a schema.Table object."""        self._run_visitor(self.dialect.schemagenerator, entity, connection=connection, **kwargs)    def drop(self, entity, connection=None, **kwargs):        """Drop a table or index within this engine's database connection given a schema.Table object."""        self._run_visitor(self.dialect.schemadropper, entity, connection=connection, **kwargs)    def _execute_default(self, default):        connection = self.contextual_connect()        try:            return connection._execute_default(default)        finally:            connection.close()    def func(self):        return expression._FunctionGenerator(bind=self)    func = property(func)    def text(self, text, *args, **kwargs):        """Return a sql.text() object for performing literal queries."""        return expression.text(text, bind=self, *args, **kwargs)    def _run_visitor(self, visitorcallable, element, connection=None, **kwargs):        if connection is None:            conn = self.contextual_connect(close_with_result=False)        else:            conn = connection        try:            visitorcallable(self.dialect, conn, **kwargs).traverse(element)        finally:            if connection is None:                conn.close()    def transaction(self, callable_, connection=None, *args, **kwargs):        """Execute the given function within a transaction boundary.        This is a shortcut for explicitly calling `begin()` and `commit()`        and optionally `rollback()` when exceptions are raised.  The        given `*args` and `**kwargs` will be passed to the function, as        well as the Connection used in the transaction.        """        if connection is None:            conn = self.contextual_connect()        else:            conn = connection        try:            trans = conn.begin()            try:                ret = callable_(conn, *args, **kwargs)                trans.commit()                return ret            except:                trans.rollback()                raise        finally:            if connection is None:                conn.close()    def run_callable(self, callable_, connection=None, *args, **kwargs):        if connection is None:            conn = self.contextual_connect()        else:            conn = connection        try:            return callable_(conn, *args, **kwargs)        finally:            if connection is None:                conn.close()    def execute(self, statement, *multiparams, **params):        connection = self.contextual_connect(close_with_result=True)        return connection.execute(statement, *multiparams, **params)    def scalar(self, statement, *multiparams, **params):        return self.execute(statement, *multiparams, **params).scalar()    def execute_clauseelement(self, elem, multiparams=None, params=None):        connection = self.contextual_connect(close_with_result=True)        return connection.execute_clauseelement(elem, multiparams, params)    def _execute_compiled(self, compiled, multiparams, params):        connection = self.contextual_connect(close_with_result=True)        return connection._execute_compiled(compiled, multiparams, params)    def statement_compiler(self, statement, **kwargs):        return self.dialect.statement_compiler(self.dialect, statement, bind=self, **kwargs)    def connect(self, **kwargs):        """Return a newly allocated Connection object."""        return Connection(self, **kwargs)    def contextual_connect(self, close_with_result=False, **kwargs):        """Return a Connection object which may be newly allocated, or may be part of some ongoing context.        This Connection is meant to be used by the various "auto-connecting" operations.        """        return Connection(self, self.pool.connect(), close_with_result=close_with_result, **kwargs)    def table_names(self, schema=None, connection=None):        """Return a list of all table names available in the database.        schema:          Optional, retrieve names from a non-default schema.        connection:          Optional, use a specified connection.  Default is the          ``contextual_connect`` for this ``Engine``.        """        if connection is None:            conn = self.contextual_connect()        else:            conn = connection        if not schema:            try:                schema =  self.dialect.get_default_schema_name(conn)            except NotImplementedError:                pass        try:            return self.dialect.table_names(conn, schema)        finally:            if connection is None:                conn.close()    def reflecttable(self, table, connection=None, include_columns=None):        """Given a Table object, reflects its columns and properties from the database."""        if connection is None:            conn = self.contextual_connect()        else:            conn = connection        try:            self.dialect.reflecttable(conn, table, include_columns)        finally:            if connection is None:                conn.close()    def has_table(self, table_name, schema=None):        return self.run_callable(lambda c: self.dialect.has_table(c, table_name, schema=schema))    def raw_connection(self):        """Return a DB-API connection."""        return self.pool.unique_connection()class RowProxy(object):    """Proxy a single cursor row for a parent ResultProxy.    Mostly follows "ordered dictionary" behavior, mapping result    values to the string-based column name, the integer position of    the result in the row, as well as Column instances which can be    mapped to the original Columns that produced this result set (for    results that correspond to constructed SQL expressions).    """    def __init__(self, parent, row):        """RowProxy objects are constructed by ResultProxy objects."""        self.__parent = parent        self.__row = row        if self.__parent._ResultProxy__echo:            self.__parent.context.engine.logger.debug("Row " + repr(row))    def close(self):        """Close the parent ResultProxy."""        self.__parent.close()    def __contains__(self, key):        return self.__parent._has_key(self.__row, key)    def __len__(self):        return len(self.__row)    def __iter__(self):        for i in xrange(len(self.__row)):            yield self.__parent._get_col(self.__row, i)    def __eq__(self, other):        return ((other is self) or                (other == tuple([self.__parent._get_col(self.__row, key)                                 for key in xrange(len(self.__row))])))    def __ne__(self, other):        return not self.__eq__(other)    def __repr__(self):        return repr(tuple(self))    def has_key(self, key):        """Return True if this RowProxy contains the given key."""        return self.__parent._has_key(self.__row, key)    def __getitem__(self, key):        return self.__parent._get_col(self.__row, key)    def __getattr__(self, name):        try:            return self.__parent._get_col(self.__row, name)        except KeyError, e:            raise AttributeError(e.args[0])    def items(self):        """Return a list of tuples, each tuple containing a key/value pair."""        return [(key, getattr(self, key)) for key in self.keys()]    def keys(self):        """Return the list of keys as strings represented by this RowProxy."""        return self.__parent.keys    def values(self):        """Return the values represented by this RowProxy as a list."""        return list(self)class BufferedColumnRow(RowProxy):    def __init__(self, parent, row):        row = [ResultProxy._get_col(parent, row, i) for i in xrange(len(row))]        super(BufferedColumnRow, self).__init__(parent, row)class ResultProxy(object):    """Wraps a DB-API cursor object to provide easier access to row columns.    Individual columns may be accessed by their integer position,    case-insensitive column name, or by ``schema.Column``    object. e.g.::      row = fetchone()      col1 = row[0]    # access via integer position      col2 = row['col2']   # access via name      col3 = row[mytable.c.mycol] # access via Column object.    ResultProxy also contains a map of TypeEngine objects and will    invoke the appropriate ``result_processor()`` method before    returning columns, as well as the ExecutionContext corresponding    to the statement execution.  It provides several methods for which    to obtain information from the underlying ExecutionContext.    """    _process_row = RowProxy    def __init__(self, context):        """ResultProxy objects are constructed via the execute() method on SQLEngine."""        self.context = context        self.dialect = context.dialect        self.closed = False        self.cursor = context.cursor        self.connection = context.root_connection        self.__echo = context.engine._should_log_info        if context.returns_rows:            self._init_metadata()            self._rowcount = None        else:            self._rowcount = context.get_rowcount()            self.close()    def rowcount(self):        if self._rowcount is not None:            return self._rowcount        else:            return self.context.get_rowcount()    rowcount = property(rowcount)    def lastrowid(self):        return self.cursor.lastrowid    lastrowid = property(lastrowid)    def out_parameters(self):        return self.context.out_parameters    out_parameters = property(out_parameters)    def _init_metadata(self):        self.__props = {}        self._key_cache = self._create_key_cache()        self.__keys = []        metadata = self.cursor.description        if metadata is not None:            typemap = self.dialect.dbapi_type_map            for i, item in enumerate(metadata):                colname = item[0].decode(self.dialect.encoding)                if '.' in colname:                    # sqlite will in some circumstances prepend table name to colnames, so strip                    origname = colname                    colname = colname.split('.')[-1]                else:                    origname = None                if self.context.result_map:                    try:                        (name, obj, type_) = self.context.result_map[colname.lower()]                    except KeyError:                        (name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE))                else:                    (name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE))                rec = (type_, type_.dialect_impl(self.dialect).result_processor(self.dialect), i)                if self.__props.setdefault(name.lower(), rec) is not rec:                    self.__props[name.lower()] = (type_, self.__ambiguous_processor(name), 0)                # store the "origname" if we truncated (sqlite only)                if origname:                    if self.__props.setdefault(origname.lower(), rec) is not rec:                        self.__props[origname.lower()] = (type_, self.__ambiguous_processor(origname), 0)                self.__keys.append(colname)                self.__props[i] = rec                if obj:                    for o in obj:                        self.__props[o] = rec            if self.__echo:                self.context.engine.logger.debug("Col " + repr(tuple([x[0] for x in metadata])))    def _create_key_cache(self):        # local copies to avoid circular ref against 'self'        props = self.__props        context = self.context        def lookup_key(key):            """Given a key, which could be a ColumnElement, string, etc.,            matches it to the appropriate key we got from the result set's            metadata; then cache it locally for quick re-access."""            if isinstance(key, basestring):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -