📄 base.py
字号:
self.engine.logger.info("ROLLBACK") try: self.engine.dialect.do_rollback(self.connection) self.__transaction = None except Exception, e: self._handle_dbapi_exception(e, None, None, None) raise else: self.__transaction = None def _commit_impl(self): if self.engine._should_log_info: self.engine.logger.info("COMMIT") try: self.engine.dialect.do_commit(self.connection) self.__transaction = None except Exception, e: self._handle_dbapi_exception(e, None, None, None) raise def _savepoint_impl(self, name=None): if name is None: self.__savepoint_seq += 1 name = 'sa_savepoint_%s' % self.__savepoint_seq if self.__connection.is_valid: self.engine.dialect.do_savepoint(self, name) return name def _rollback_to_savepoint_impl(self, name, context): if self.__connection.is_valid: self.engine.dialect.do_rollback_to_savepoint(self, name) self.__transaction = context def _release_savepoint_impl(self, name, context): if self.__connection.is_valid: self.engine.dialect.do_release_savepoint(self, name) self.__transaction = context def _begin_twophase_impl(self, xid): if self.__connection.is_valid: self.engine.dialect.do_begin_twophase(self, xid) def _prepare_twophase_impl(self, xid): if self.__connection.is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_prepare_twophase(self, xid) def _rollback_twophase_impl(self, xid, is_prepared): if self.__connection.is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_rollback_twophase(self, xid, is_prepared) self.__transaction = None def _commit_twophase_impl(self, xid, is_prepared): if self.__connection.is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_commit_twophase(self, xid, is_prepared) self.__transaction = None def _autocommit(self, context): """Possibly issue a commit. When no Transaction is present, this is called after statement execution to provide "autocommit" behavior. Dialects may inspect the statement to determine if a commit is actually required. """ # TODO: have the dialect determine if autocommit can be set on # the connection directly without this extra step if not self.in_transaction() and context.should_autocommit: self._commit_impl() def _autorollback(self): if not self.in_transaction(): self._rollback_impl() def close(self): """Close this Connection.""" try: conn = self.__connection except AttributeError: return if not self.__branch: conn.close() self.__invalid = False del self.__connection def scalar(self, object, *multiparams, **params): """Executes and returns the first column of the first row. The underlying result/cursor is closed after execution. """ return self.execute(object, *multiparams, **params).scalar() def statement_compiler(self, statement, **kwargs): return self.dialect.statement_compiler(self.dialect, statement, bind=self, **kwargs) def execute(self, object, *multiparams, **params): """Executes and returns a ResultProxy.""" for c in type(object).__mro__: if c in Connection.executors: return Connection.executors[c](self, object, multiparams, params) else: raise exceptions.InvalidRequestError("Unexecutable object type: " + str(type(object))) def _execute_default(self, default, multiparams=None, params=None): return self.engine.dialect.defaultrunner(self.__create_execution_context()).traverse_single(default) def _execute_text(self, statement, multiparams, params): parameters = self.__distill_params(multiparams, params) context = self.__create_execution_context(statement=statement, parameters=parameters) self.__execute_raw(context) self._autocommit(context) return context.result() def __distill_params(self, multiparams, params): """given arguments from the calling form *multiparams, **params, return a list of bind parameter structures, usually a list of dictionaries. in the case of 'raw' execution which accepts positional parameters, it may be a list of tuples or lists.""" if multiparams is None or len(multiparams) == 0: if params: return [params] else: return [{}] elif len(multiparams) == 1: if isinstance(multiparams[0], (list, tuple)): if isinstance(multiparams[0][0], (list, tuple, dict)): return multiparams[0] else: return [multiparams[0]] elif isinstance(multiparams[0], dict): return [multiparams[0]] else: return [[multiparams[0]]] else: if isinstance(multiparams[0], (list, tuple, dict)): return multiparams else: return [multiparams] def _execute_function(self, func, multiparams, params): return self.execute_clauseelement(func.select(), multiparams, params) def execute_clauseelement(self, elem, multiparams=None, params=None): params = self.__distill_params(multiparams, params) if params: keys = params[0].keys() else: keys = None return self._execute_compiled(elem.compile(dialect=self.dialect, column_keys=keys, inline=len(params) > 1), distilled_params=params) def _execute_compiled(self, compiled, multiparams=None, params=None, distilled_params=None): """Execute a sql.Compiled object.""" if not compiled.can_execute: raise exceptions.ArgumentError("Not an executable clause: %s" % (str(compiled))) if distilled_params is None: distilled_params = self.__distill_params(multiparams, params) context = self.__create_execution_context(compiled=compiled, parameters=distilled_params) context.pre_execution() self.__execute_raw(context) context.post_execution() self._autocommit(context) return context.result() def __execute_raw(self, context): if context.executemany: self._cursor_executemany(context.cursor, context.statement, context.parameters, context=context) else: self._cursor_execute(context.cursor, context.statement, context.parameters[0], context=context) def _execute_ddl(self, ddl, params, multiparams): if params: schema_item, params = params[0], params[1:] else: schema_item = None return ddl(None, schema_item, self, *params, **multiparams) def _handle_dbapi_exception(self, e, statement, parameters, cursor): if getattr(self, '_reentrant_error', False): raise exceptions.DBAPIError.instance(None, None, e) self._reentrant_error = True try: if not isinstance(e, self.dialect.dbapi.Error): return is_disconnect = self.dialect.is_disconnect(e) if is_disconnect: self.invalidate(e) self.engine.dispose() else: if cursor: cursor.close() self._autorollback() if self.__close_with_result: self.close() raise exceptions.DBAPIError.instance(statement, parameters, e, connection_invalidated=is_disconnect) finally: del self._reentrant_error def __create_execution_context(self, **kwargs): try: return self.engine.dialect.create_execution_context(connection=self, **kwargs) except Exception, e: self._handle_dbapi_exception(e, kwargs.get('statement', None), kwargs.get('parameters', None), None) raise def _cursor_execute(self, cursor, statement, parameters, context=None): if self.engine._should_log_info: self.engine.logger.info(statement) self.engine.logger.info(repr(parameters)) try: self.dialect.do_execute(cursor, statement, parameters, context=context) except Exception, e: self._handle_dbapi_exception(e, statement, parameters, cursor) raise def _cursor_executemany(self, cursor, statement, parameters, context=None): if self.engine._should_log_info: self.engine.logger.info(statement) self.engine.logger.info(repr(parameters)) try: self.dialect.do_executemany(cursor, statement, parameters, context=context) except Exception, e: self._handle_dbapi_exception(e, statement, parameters, cursor) raise # poor man's multimethod/generic function thingy executors = { expression._Function: _execute_function, expression.ClauseElement: execute_clauseelement, Compiled: _execute_compiled, schema.SchemaItem: _execute_default, schema.DDL: _execute_ddl, basestring: _execute_text } def create(self, entity, **kwargs): """Create a Table or Index given an appropriate Schema object.""" return self.engine.create(entity, connection=self, **kwargs) def drop(self, entity, **kwargs): """Drop a Table or Index given an appropriate Schema object.""" return self.engine.drop(entity, connection=self, **kwargs) def reflecttable(self, table, include_columns=None): """Reflect the columns in the given string table name from the database.""" return self.engine.reflecttable(table, self, include_columns) def default_schema_name(self): return self.engine.dialect.get_default_schema_name(self) def run_callable(self, callable_): return callable_(self)class Transaction(object): """Represent a Transaction in progress. The Transaction object is **not** threadsafe. """ def __init__(self, connection, parent): self._connection = connection self._parent = parent or self self._is_active = True def connection(self): "The Connection object referenced by this Transaction" return self._connection connection = property(connection) def is_active(self): return self._is_active is_active = property(is_active) def close(self): """Close this transaction. If this transaction is the base transaction in a begin/commit nesting, the transaction will rollback(). Otherwise, the method returns. This is used to cancel a Transaction without affecting the scope of an enclosing transaction. """ if not self._parent._is_active: return if self._parent is self: self.rollback() def rollback(self): if not self._parent._is_active: return self._is_active = False self._do_rollback() def _do_rollback(self): self._parent.rollback() def commit(self): if not self._parent._is_active: raise exceptions.InvalidRequestError("This transaction is inactive") self._do_commit() self._is_active = False def _do_commit(self): pass def __enter__(self): return self def __exit__(self, type, value, traceback): if type is None and self._is_active: self.commit() else: self.rollback()class RootTransaction(Transaction): def __init__(self, connection): super(RootTransaction, self).__init__(connection, None) self._connection._begin_impl() def _do_rollback(self): self._connection._rollback_impl() def _do_commit(self): self._connection._commit_impl()class NestedTransaction(Transaction): def __init__(self, connection, parent): super(NestedTransaction, self).__init__(connection, parent) self._savepoint = self._connection._savepoint_impl() def _do_rollback(self): self._connection._rollback_to_savepoint_impl(self._savepoint, self._parent) def _do_commit(self): self._connection._release_savepoint_impl(self._savepoint, self._parent)class TwoPhaseTransaction(Transaction): def __init__(self, connection, xid): super(TwoPhaseTransaction, self).__init__(connection, None) self._is_prepared = False self.xid = xid self._connection._begin_twophase_impl(self.xid) def prepare(self): if not self._parent._is_active: raise exceptions.InvalidRequestError("This transaction is inactive") self._connection._prepare_twophase_impl(self.xid) self._is_prepared = True def _do_rollback(self): self._connection._rollback_twophase_impl(self.xid, self._is_prepared) def commit(self): self._connection._commit_twophase_impl(self.xid, self._is_prepared)class Engine(Connectable): """
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -