📄 pool.py
字号:
"""Separate this connection from its Pool. This means that the connection will no longer be returned to the pool when closed, and will instead be literally closed. The containing ConnectionRecord is separated from the DB-API connection, and will create a new connection when next used. Note that any overall connection limiting constraints imposed by a Pool implementation may be violated after a detach, as the detached connection is removed from the pool's knowledge and control. """ if self._connection_record is not None: self._connection_record.connection = None self._connection_record.backref = None self._pool.do_return_conn(self._connection_record) self._detached_info = \ self._connection_record.info.copy() self._connection_record = None def close(self): self.__counter -=1 if self.__counter == 0: self._close() def _close(self): _finalize_fairy(self.connection, self._connection_record, self._pool) self.connection = None self._connection_record = Noneclass _CursorFairy(object): def __init__(self, parent, cursor): self.__parent = parent self.cursor = cursor def invalidate(self, e=None): self.__parent.invalidate(e=e) def close(self): try: self.cursor.close() except Exception, e: try: ex_text = str(e) except TypeError: ex_text = repr(e) self.__parent._logger.warn("Error closing cursor: " + ex_text) if isinstance(e, (SystemExit, KeyboardInterrupt)): raise def __getattr__(self, key): return getattr(self.cursor, key)class SingletonThreadPool(Pool): """A Pool that maintains one connection per thread. Maintains one connection per each thread, never moving a connection to a thread other than the one which it was created in. This is used for SQLite, which both does not handle multithreading by default, and also requires a singleton connection if a :memory: database is being used. Options are the same as those of Pool, as well as: pool_size: 5 The number of threads in which to maintain connections at once. """ def __init__(self, creator, pool_size=5, **params): params['use_threadlocal'] = True Pool.__init__(self, creator, **params) self._conns = {} self.size = pool_size def recreate(self): self.log("Pool recreating") return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners) def dispose(self): """Dispose of this pool. this method leaves the possibility of checked-out connections remaining opened, so it is advised to not reuse the pool once dispose() is called, and to instead use a new pool constructed by the recreate() method. """ for key, conn in self._conns.items(): try: conn.close() except (SystemExit, KeyboardInterrupt): raise except: # sqlite won't even let you close a conn from a thread # that didn't create it pass del self._conns[key] def dispose_local(self): try: del self._conns[thread.get_ident()] except KeyError: pass def cleanup(self): for key in self._conns.keys(): try: del self._conns[key] except KeyError: pass if len(self._conns) <= self.size: return def status(self): return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns)) def do_return_conn(self, conn): pass def do_get(self): try: return self._conns[thread.get_ident()] except KeyError: c = self.create_connection() self._conns[thread.get_ident()] = c if len(self._conns) > self.size: self.cleanup() return cclass QueuePool(Pool): """A Pool that imposes a limit on the number of open connections. Arguments include all those used by the base Pool class, as well as: pool_size The size of the pool to be maintained. This is the largest number of connections that will be kept persistently in the pool. Note that the pool begins with no connections; once this number of connections is requested, that number of connections will remain. Defaults to 5. max_overflow The maximum overflow size of the pool. When the number of checked-out connections reaches the size set in pool_size, additional connections will be returned up to this limit. When those additional connections are returned to the pool, they are disconnected and discarded. It follows then that the total number of simultaneous connections the pool will allow is pool_size + `max_overflow`, and the total number of "sleeping" connections the pool will allow is pool_size. `max_overflow` can be set to -1 to indicate no overflow limit; no limit will be placed on the total number of concurrent connections. Defaults to 10. timeout The number of seconds to wait before giving up on returning a connection. Defaults to 30. """ def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params): Pool.__init__(self, creator, **params) self._pool = Queue.Queue(pool_size) self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout self._overflow_lock = self._max_overflow > -1 and threading.Lock() or None def recreate(self): self.log("Pool recreating") return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners) def do_return_conn(self, conn): try: self._pool.put(conn, False) except Queue.Full: if self._overflow_lock is None: self._overflow -= 1 else: self._overflow_lock.acquire() try: self._overflow -= 1 finally: self._overflow_lock.release() def do_get(self): try: wait = self._max_overflow > -1 and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) except Queue.Empty: if self._max_overflow > -1 and self._overflow >= self._max_overflow: if not wait: return self.do_get() else: raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) if self._overflow_lock is not None: self._overflow_lock.acquire() if self._max_overflow > -1 and self._overflow >= self._max_overflow: if self._overflow_lock is not None: self._overflow_lock.release() return self.do_get() try: con = self.create_connection() self._overflow += 1 finally: if self._overflow_lock is not None: self._overflow_lock.release() return con def dispose(self): while True: try: conn = self._pool.get(False) conn.close() except Queue.Empty: break self._overflow = 0 - self.size() if self._should_log_info: self.log("Pool disposed. " + self.status()) def status(self): tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout()) return "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup def size(self): return self._pool.maxsize def checkedin(self): return self._pool.qsize() def overflow(self): return self._overflow def checkedout(self): return self._pool.maxsize - self._pool.qsize() + self._overflowclass NullPool(Pool): """A Pool which does not pool connections. Instead it literally opens and closes the underlying DB-API connection per each connection open/close. """ def status(self): return "NullPool" def do_return_conn(self, conn): conn.close() def do_return_invalid(self, conn): pass def do_get(self): return self.create_connection()class StaticPool(Pool): """A Pool of exactly one connection, used for all requests.""" def __init__(self, creator, **params): Pool.__init__(self, creator, **params) self._conn = creator() self.connection = _ConnectionRecord(self) def status(self): return "StaticPool" def create_connection(self): return self._conn def do_return_conn(self, conn): pass def do_return_invalid(self, conn): pass def do_get(self): return self.connectionclass AssertionPool(Pool): """A Pool that allows at most one checked out connection at any given time. This will raise an exception if more than one connection is checked out at a time. Useful for debugging code that is using more connections than desired. """ ## TODO: modify this to handle an arbitrary connection count. def __init__(self, creator, **params): Pool.__init__(self, creator, **params) self.connection = _ConnectionRecord(self) self._conn = self.connection def status(self): return "AssertionPool" def create_connection(self): raise "Invalid" def do_return_conn(self, conn): assert conn is self._conn and self.connection is None self.connection = conn def do_return_invalid(self, conn): raise "Invalid" def do_get(self): assert self.connection is not None c = self.connection self.connection = None return cclass _DBProxy(object): """Layers connection pooling behavior on top of a standard DB-API module. Proxies a DB-API 2.0 connect() call to a connection pool keyed to the specific connect parameters. Other functions and attributes are delegated to the underlying DB-API module. """ def __init__(self, module, poolclass=QueuePool, **params): """Initializes a new proxy. module a DB-API 2.0 module poolclass a Pool class, defaulting to QueuePool Other parameters are sent to the Pool object's constructor. """ self.module = module self.params = params self.poolclass = poolclass self.pools = {} def close(self): for key in self.pools.keys(): del self.pools[key] def __del__(self): self.close() def __getattr__(self, key): return getattr(self.module, key) def get_pool(self, *args, **params): key = self._serialize(*args, **params) try: return self.pools[key] except KeyError: pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params) self.pools[key] = pool return pool def connect(self, *args, **params): """Activate a connection to the database. Connect to the database using this DBProxy's module and the given connect arguments. If the arguments match an existing pool, the connection will be returned from the pool's current thread-local connection instance, or if there is no thread-local connection instance it will be checked out from the set of pooled connections. If the pool has no available connections and allows new connections to be created, a new database connection will be made. """ return self.get_pool(*args, **params).connect() def dispose(self, *args, **params): """Dispose the connection pool referenced by the given connect arguments.""" key = self._serialize(*args, **params) try: del self.pools[key] except KeyError: pass def _serialize(self, *args, **params): return pickle.dumps([args, params])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -