⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 base.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
    def get_rowcount(self):        """Return the count of rows updated/deleted for an UPDATE/DELETE statement."""        raise NotImplementedError()    def should_autocommit_compiled(self, compiled):        """return True if the given Compiled object refers to a "committable" statement."""        raise NotImplementedError()    def should_autocommit_text(self, statement):        """Parse the given textual statement and return True if it refers to a "committable" statement"""        raise NotImplementedError()    def last_inserted_ids(self):        """Return the list of the primary key values for the last insert statement executed.        This does not apply to straight textual clauses; only to        ``sql.Insert`` objects compiled against a ``schema.Table``        object.  The order of items in the list is the same as that of        the Table's 'primary_key' attribute.        """        raise NotImplementedError()    def last_inserted_params(self):        """Return a dictionary of the full parameter dictionary for the last compiled INSERT statement.        Includes any ColumnDefaults or Sequences that were pre-executed.        """        raise NotImplementedError()    def last_updated_params(self):        """Return a dictionary of the full parameter dictionary for the last compiled UPDATE statement.        Includes any ColumnDefaults that were pre-executed.        """        raise NotImplementedError()    def lastrow_has_defaults(self):        """Return True if the last INSERT or UPDATE row contained        inlined or database-side defaults.        """        raise NotImplementedError()class Compiled(object):    """Represent a compiled SQL expression.    The ``__str__`` method of the ``Compiled`` object should produce    the actual text of the statement.  ``Compiled`` objects are    specific to their underlying database dialect, and also may    or may not be specific to the columns referenced within a    particular set of bind parameters.  In no case should the    ``Compiled`` object be dependent on the actual values of those    bind parameters, even though it may reference those values as    defaults.    """    def __init__(self, dialect, statement, column_keys=None, bind=None):        """Construct a new ``Compiled`` object.        dialect          ``Dialect`` to compile against.        statement          ``ClauseElement`` to be compiled.        column_keys          a list of column names to be compiled into an INSERT or UPDATE          statement.        bind          Optional Engine or Connection to compile this statement against.        """        self.dialect = dialect        self.statement = statement        self.column_keys = column_keys        self.bind = bind        self.can_execute = statement.supports_execution()    def compile(self):        """Produce the internal string representation of this element."""        raise NotImplementedError()    def __str__(self):        """Return the string text of the generated SQL statement."""        raise NotImplementedError()    def get_params(self, **params):        """Use construct_params().  (supports unicode names)        """        return self.construct_params(params)    get_params = util.deprecated(get_params)    def construct_params(self, params):        """Return the bind params for this compiled object.        `params` is a dict of string/object pairs whos        values will override bind values compiled in        to the statement.        """        raise NotImplementedError()    def execute(self, *multiparams, **params):        """Execute this compiled object."""        e = self.bind        if e is None:            raise exceptions.UnboundExecutionError("This Compiled object is not bound to any Engine or Connection.")        return e._execute_compiled(self, multiparams, params)    def scalar(self, *multiparams, **params):        """Execute this compiled object and return the result's scalar value."""        return self.execute(*multiparams, **params).scalar()class Connectable(object):    """Interface for an object that can provide an Engine and a Connection object which correponds to that Engine."""    def contextual_connect(self):        """Return a Connection object which may be part of an ongoing context."""        raise NotImplementedError()    def create(self, entity, **kwargs):        """Create a table or index given an appropriate schema object."""        raise NotImplementedError()    def drop(self, entity, **kwargs):        """Drop a table or index given an appropriate schema object."""        raise NotImplementedError()    def execute(self, object, *multiparams, **params):        raise NotImplementedError()    def execute_clauseelement(self, elem, multiparams=None, params=None):        raise NotImplementedError()class Connection(Connectable):    """Provides high-level functionality for a wrapped DB-API connection.    Provides execution support for string-based SQL statements as well    as ClauseElement, Compiled and DefaultGenerator objects.  Provides    a begin method to return Transaction objects.    The Connection object is **not** threadsafe.    """    def __init__(self, engine, connection=None, close_with_result=False,                 _branch=False):        """Construct a new Connection.        Connection objects are typically constructed by an        [sqlalchemy.engine#Engine], see the ``connect()`` and        ``contextual_connect()`` methods of Engine.        """        self.engine = engine        self.__connection = connection or engine.raw_connection()        self.__transaction = None        self.__close_with_result = close_with_result        self.__savepoint_seq = 0        self.__branch = _branch        self.__invalid = False    def _branch(self):        """Return a new Connection which references this Connection's        engine and connection; but does not have close_with_result enabled,        and also whose close() method does nothing.        This is used to execute "sub" statements within a single execution,        usually an INSERT statement.        """        return Connection(self.engine, self.__connection, _branch=True)    def dialect(self):        "Dialect used by this Connection."        return self.engine.dialect    dialect = property(dialect)    def closed(self):        """return True if this connection is closed."""        return not self.__invalid and '_Connection__connection' not in self.__dict__    closed = property(closed)    def invalidated(self):        """return True if this connection was invalidated."""        return self.__invalid    invalidated = property(invalidated)    def connection(self):        "The underlying DB-API connection managed by this Connection."        try:            return self.__connection        except AttributeError:            if self.__invalid:                if self.__transaction is not None:                    raise exceptions.InvalidRequestError("Can't reconnect until invalid transaction is rolled back")                self.__connection = self.engine.raw_connection()                self.__invalid = False                return self.__connection            raise exceptions.InvalidRequestError("This Connection is closed")    connection = property(connection)    def should_close_with_result(self):        """Indicates if this Connection should be closed when a corresponding        ResultProxy is closed; this is essentially an auto-release mode.        """        return self.__close_with_result    should_close_with_result = property(should_close_with_result)    def info(self):        """A collection of per-DB-API connection instance properties."""        return self.connection.info    info = property(info)    properties = property(info, doc="""An alias for the .info collection, will be removed in 0.5.""")    def connect(self):        """Returns self.        This ``Connectable`` interface method returns self, allowing        Connections to be used interchangably with Engines in most        situations that require a bind.        """        return self    def contextual_connect(self, **kwargs):        """Returns self.        This ``Connectable`` interface method returns self, allowing        Connections to be used interchangably with Engines in most        situations that require a bind.        """        return self    def invalidate(self, exception=None):        """Invalidate the underlying DBAPI connection associated with this Connection.        The underlying DB-API connection is literally closed (if        possible), and is discarded.  Its source connection pool will        typically lazily create a new connection to replace it.        Upon the next usage, this Connection will attempt to reconnect        to the pool with a new connection.        Transactions in progress remain in an "opened" state (even though        the actual transaction is gone); these must be explicitly        rolled back before a reconnect on this Connection can proceed.  This        is to prevent applications from accidentally continuing their transactional        operations in a non-transactional state.        """        if self.__connection.is_valid:            self.__connection.invalidate(exception)        del self.__connection        self.__invalid = True    def detach(self):        """Detach the underlying DB-API connection from its connection pool.        This Connection instance will remain useable.  When closed,        the DB-API connection will be literally closed and not        returned to its pool.  The pool will typically lazily create a        new connection to replace the detached connection.        This method can be used to insulate the rest of an application        from a modified state on a connection (such as a transaction        isolation level or similar).  Also see        [sqlalchemy.interfaces#PoolListener] for a mechanism to modify        connection state when connections leave and return to their        connection pool.        """        self.__connection.detach()    def begin(self):        """Begin a transaction and return a Transaction handle.        Repeated calls to ``begin`` on the same Connection will create        a lightweight, emulated nested transaction.  Only the        outermost transaction may ``commit``.  Calls to ``commit`` on        inner transactions are ignored.  Any transaction in the        hierarchy may ``rollback``, however.        """        if self.__transaction is None:            self.__transaction = RootTransaction(self)        else:            return Transaction(self, self.__transaction)        return self.__transaction    def begin_nested(self):        """Begin a nested transaction and return a Transaction handle.        Nested transactions require SAVEPOINT support in the        underlying database.  Any transaction in the hierarchy may        ``commit`` and ``rollback``, however the outermost transaction        still controls the overall ``commit`` or ``rollback`` of the        transaction of a whole.        """        if self.__transaction is None:            self.__transaction = RootTransaction(self)        else:            self.__transaction = NestedTransaction(self, self.__transaction)        return self.__transaction    def begin_twophase(self, xid=None):        """Begin a two-phase or XA transaction and return a Transaction handle.        xid          the two phase transaction id.  If not supplied, a random id          will be generated.        """        if self.__transaction is not None:            raise exceptions.InvalidRequestError(                "Cannot start a two phase transaction when a transaction "                "is already in progress.")        if xid is None:            xid = self.engine.dialect.create_xid();        self.__transaction = TwoPhaseTransaction(self, xid)        return self.__transaction    def recover_twophase(self):        return self.engine.dialect.do_recover_twophase(self)    def rollback_prepared(self, xid, recover=False):        self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)    def commit_prepared(self, xid, recover=False):        self.engine.dialect.do_commit_twophase(self, xid, recover=recover)    def in_transaction(self):        """Return True if a transaction is in progress."""        return self.__transaction is not None    def _begin_impl(self):        if self.engine._should_log_info:            self.engine.logger.info("BEGIN")        try:            self.engine.dialect.do_begin(self.connection)        except Exception, e:            self._handle_dbapi_exception(e, None, None, None)            raise    def _rollback_impl(self):        if not self.closed and not self.invalidated and self.__connection.is_valid:            if self.engine._should_log_info:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -