⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pool.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
        """Separate this connection from its Pool.        This means that the connection will no longer be returned to the        pool when closed, and will instead be literally closed.  The        containing ConnectionRecord is separated from the DB-API connection,        and will create a new connection when next used.        Note that any overall connection limiting constraints imposed by a        Pool implementation may be violated after a detach, as the detached        connection is removed from the pool's knowledge and control.        """        if self._connection_record is not None:            self._connection_record.connection = None            self._connection_record.backref = None            self._pool.do_return_conn(self._connection_record)            self._detached_info = \              self._connection_record.info.copy()            self._connection_record = None    def close(self):        self.__counter -=1        if self.__counter == 0:            self._close()    def _close(self):        _finalize_fairy(self.connection, self._connection_record, self._pool)        self.connection = None        self._connection_record = Noneclass _CursorFairy(object):    def __init__(self, parent, cursor):        self.__parent = parent        self.cursor = cursor    def invalidate(self, e=None):        self.__parent.invalidate(e=e)    def close(self):        try:            self.cursor.close()        except Exception, e:            try:                ex_text = str(e)            except TypeError:                ex_text = repr(e)            self.__parent._logger.warn("Error closing cursor: " + ex_text)            if isinstance(e, (SystemExit, KeyboardInterrupt)):                raise    def __getattr__(self, key):        return getattr(self.cursor, key)class SingletonThreadPool(Pool):    """A Pool that maintains one connection per thread.    Maintains one connection per each thread, never moving a connection to a    thread other than the one which it was created in.    This is used for SQLite, which both does not handle multithreading by    default, and also requires a singleton connection if a :memory: database    is being used.    Options are the same as those of Pool, as well as:    pool_size: 5      The number of threads in which to maintain connections at once.    """    def __init__(self, creator, pool_size=5, **params):        params['use_threadlocal'] = True        Pool.__init__(self, creator, **params)        self._conns = {}        self.size = pool_size    def recreate(self):        self.log("Pool recreating")        return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)    def dispose(self):        """Dispose of this pool.        this method leaves the possibility of checked-out connections        remaining opened, so it is advised to not reuse the pool once        dispose() is called, and to instead use a new pool constructed        by the recreate() method.        """        for key, conn in self._conns.items():            try:                conn.close()            except (SystemExit, KeyboardInterrupt):                raise            except:                # sqlite won't even let you close a conn from a thread                # that didn't create it                pass            del self._conns[key]    def dispose_local(self):        try:            del self._conns[thread.get_ident()]        except KeyError:            pass    def cleanup(self):        for key in self._conns.keys():            try:                del self._conns[key]            except KeyError:                pass            if len(self._conns) <= self.size:                return    def status(self):        return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))    def do_return_conn(self, conn):        pass    def do_get(self):        try:            return self._conns[thread.get_ident()]        except KeyError:            c = self.create_connection()            self._conns[thread.get_ident()] = c            if len(self._conns) > self.size:                self.cleanup()            return cclass QueuePool(Pool):    """A Pool that imposes a limit on the number of open connections.    Arguments include all those used by the base Pool class, as well    as:    pool_size      The size of the pool to be maintained. This is the largest      number of connections that will be kept persistently in the      pool. Note that the pool begins with no connections; once this      number of connections is requested, that number of connections      will remain. Defaults to 5.    max_overflow      The maximum overflow size of the pool. When the number of      checked-out connections reaches the size set in pool_size,      additional connections will be returned up to this limit. When      those additional connections are returned to the pool, they are      disconnected and discarded. It follows then that the total      number of simultaneous connections the pool will allow is      pool_size + `max_overflow`, and the total number of "sleeping"      connections the pool will allow is pool_size. `max_overflow` can      be set to -1 to indicate no overflow limit; no limit will be      placed on the total number of concurrent connections. Defaults      to 10.    timeout      The number of seconds to wait before giving up on returning a      connection. Defaults to 30.    """    def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params):        Pool.__init__(self, creator, **params)        self._pool = Queue.Queue(pool_size)        self._overflow = 0 - pool_size        self._max_overflow = max_overflow        self._timeout = timeout        self._overflow_lock = self._max_overflow > -1 and threading.Lock() or None    def recreate(self):        self.log("Pool recreating")        return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)    def do_return_conn(self, conn):        try:            self._pool.put(conn, False)        except Queue.Full:            if self._overflow_lock is None:                self._overflow -= 1            else:                self._overflow_lock.acquire()                try:                    self._overflow -= 1                finally:                    self._overflow_lock.release()    def do_get(self):        try:            wait = self._max_overflow > -1 and self._overflow >= self._max_overflow            return self._pool.get(wait, self._timeout)        except Queue.Empty:            if self._max_overflow > -1 and self._overflow >= self._max_overflow:                if not wait:                    return self.do_get()                else:                    raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))            if self._overflow_lock is not None:                self._overflow_lock.acquire()            if self._max_overflow > -1 and self._overflow >= self._max_overflow:                if self._overflow_lock is not None:                    self._overflow_lock.release()                return self.do_get()            try:                con = self.create_connection()                self._overflow += 1            finally:                if self._overflow_lock is not None:                    self._overflow_lock.release()            return con    def dispose(self):        while True:            try:                conn = self._pool.get(False)                conn.close()            except Queue.Empty:                break        self._overflow = 0 - self.size()        if self._should_log_info:            self.log("Pool disposed. " + self.status())    def status(self):        tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())        return "Pool size: %d  Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup    def size(self):        return self._pool.maxsize    def checkedin(self):        return self._pool.qsize()    def overflow(self):        return self._overflow    def checkedout(self):        return self._pool.maxsize - self._pool.qsize() + self._overflowclass NullPool(Pool):    """A Pool which does not pool connections.    Instead it literally opens and closes the underlying DB-API connection    per each connection open/close.    """    def status(self):        return "NullPool"    def do_return_conn(self, conn):       conn.close()    def do_return_invalid(self, conn):       pass    def do_get(self):        return self.create_connection()class StaticPool(Pool):    """A Pool of exactly one connection, used for all requests."""    def __init__(self, creator, **params):        Pool.__init__(self, creator, **params)        self._conn = creator()        self.connection = _ConnectionRecord(self)    def status(self):        return "StaticPool"    def create_connection(self):        return self._conn    def do_return_conn(self, conn):        pass    def do_return_invalid(self, conn):        pass    def do_get(self):        return self.connectionclass AssertionPool(Pool):    """A Pool that allows at most one checked out connection at any given time.    This will raise an exception if more than one connection is checked out    at a time.  Useful for debugging code that is using more connections    than desired.    """    ## TODO: modify this to handle an arbitrary connection count.    def __init__(self, creator, **params):        Pool.__init__(self, creator, **params)        self.connection = _ConnectionRecord(self)        self._conn = self.connection    def status(self):        return "AssertionPool"    def create_connection(self):        raise "Invalid"    def do_return_conn(self, conn):        assert conn is self._conn and self.connection is None        self.connection = conn    def do_return_invalid(self, conn):        raise "Invalid"    def do_get(self):        assert self.connection is not None        c = self.connection        self.connection = None        return cclass _DBProxy(object):    """Layers connection pooling behavior on top of a standard DB-API module.    Proxies a DB-API 2.0 connect() call to a connection pool keyed to the    specific connect parameters. Other functions and attributes are delegated    to the underlying DB-API module.    """    def __init__(self, module, poolclass=QueuePool, **params):        """Initializes a new proxy.        module          a DB-API 2.0 module        poolclass          a Pool class, defaulting to QueuePool        Other parameters are sent to the Pool object's constructor.        """        self.module = module        self.params = params        self.poolclass = poolclass        self.pools = {}    def close(self):        for key in self.pools.keys():            del self.pools[key]    def __del__(self):        self.close()    def __getattr__(self, key):        return getattr(self.module, key)    def get_pool(self, *args, **params):        key = self._serialize(*args, **params)        try:            return self.pools[key]        except KeyError:            pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params)            self.pools[key] = pool            return pool    def connect(self, *args, **params):        """Activate a connection to the database.        Connect to the database using this DBProxy's module and the given        connect arguments.  If the arguments match an existing pool, the        connection will be returned from the pool's current thread-local        connection instance, or if there is no thread-local connection        instance it will be checked out from the set of pooled connections.        If the pool has no available connections and allows new connections        to be created, a new database connection will be made.        """        return self.get_pool(*args, **params).connect()    def dispose(self, *args, **params):        """Dispose the connection pool referenced by the given connect arguments."""        key = self._serialize(*args, **params)        try:            del self.pools[key]        except KeyError:            pass    def _serialize(self, *args, **params):        return pickle.dumps([args, params])

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -