defaultconnectionpool.java

来自「一个java方面的消息订阅发送的源码」· Java 代码 · 共 644 行 · 第 1/2 页

JAVA
644
字号
    public void close() throws ResourceException {
        ManagedConnectionAcceptor[] acceptors =
                (ManagedConnectionAcceptor[]) _acceptors.toArray(
                        new ManagedConnectionAcceptor[0]);
        _acceptors.clear();

        for (int i = 0; i < acceptors.length; ++i) {
            acceptors[i].close();
        }

        ManagedConnection[] connections =
                (ManagedConnection[]) _entries.keySet().toArray(
                        new ManagedConnection[0]);
        for (int i = 0; i < connections.length; ++i) {
            connections[i].destroy();
        }
        _entries.clear();

        _accepted.clear();
        _connections.clear();

        stopReaper();
    }

    /**
     * Invoked when an acceptor receives an error.
     *
     * @param acceptor  the acceptor which received the error
     * @param throwable the error
     */
    public void error(ManagedConnectionAcceptor acceptor,
                      Throwable throwable) {
        _acceptors.remove(acceptor);

        String uri = "<unknown>";
        try {
            uri = acceptor.getURI().toString();
        } catch (ResourceException ignore) {
        }
        _log.error("Failed to accept connections on URI=" + uri,
                   throwable);

        try {
            acceptor.close();
        } catch (ResourceException exception) {
            if (_log.isDebugEnabled()) {
                _log.debug("Failed to close acceptor, URI=" + uri, exception);
            }
        }
    }

    /**
     * Sets the listener for caller events.
     *
     * @param listener the listener
     */
    public void setCallerListener(CallerListener listener) {
        _listener = listener;
    }

    /**
     * Adds a connection to the pool. If the connection was created, a {@link
     * ManagedConnectionHandle} will be returned, wrapping the supplied
     * connection.
     *
     * @param connection the connection to add
     * @param accepted  if <code>true</code> the connection was accepted via an
     *                  {@link ManagedConnectionAcceptor}, otherwise it was
     *                  created via
     *                  {@link ManagedConnectionFactory#createManagedConnection}
     * @return the (possibly wrapped) connection
     * @throws ResourceException if the connection cannot be added
     */
    protected ManagedConnection add(ManagedConnection connection,
                                    boolean accepted) throws ResourceException {
        ManagedConnection result;

        PoolEntry entry = new PoolEntry(connection, accepted);
        _entries.put(connection, entry);
        if (accepted) {
            _accepted.add(connection);
            result = connection;
        } else {
            _connections.add(connection);
            ManagedConnection handle = new ManagedConnectionHandle(
                    connection, _resolver);
            _handles.put(connection, handle);
            result = handle;
        }
        ContextInvocationHandler handler = new ContextInvocationHandler(
                _handler, _resolver);
        try {
            connection.setInvocationHandler(handler);
            connection.setConnectionEventListener(this);
            handler.setConnection(connection.getConnection());
        } catch (ResourceException exception) {
            try {
                _log.debug("Failed to initialise connection, destroying",
                           exception);
                connection.destroy();
            } catch (ResourceException nested) {
                _log.debug("Failed to destroy connection", nested);
            } finally {
                _entries.remove(connection);
                if (accepted) {
                    _accepted.remove(connection);
                } else {
                    _connections.remove(connection);
                    _handles.remove(connection);
                }
            }
            // propagate the exception
            throw exception;
        }

        // mark the connection as initialised and therefore available for
        // reaping
        entry.setInitialised();

        startReaper();

        return result;
    }

    /**
     * Remove a connection from the pool.
     *
     * @param connection the connection to remove
     */
    protected void remove(ManagedConnection connection) {
        PoolEntry entry = (PoolEntry) _entries.remove(connection);
        if (entry != null) {
            if (entry.getAccepted()) {
                _accepted.remove(connection);
            } else {
                _connections.remove(connection);
                _handles.remove(connection);
            }
            URI remoteURI = null;
            URI localURI = null;
            try {
                remoteURI = connection.getRemoteURI();
                localURI = connection.getLocalURI();
            } catch (ResourceException exception) {
                _log.debug("Failed to get connection URIs", exception);
            }

            try {
                connection.destroy();
            } catch (ResourceException exception) {
                _log.debug("Failed to destroy connection", exception);
            }
            if (remoteURI != null && localURI != null) {
                notifyDisconnection(remoteURI, localURI);
            }
        } else {
            _log.debug("ManagedConnection not found");
        }
        if (_entries.isEmpty()) {
            stopReaper();
        }
    }

    /**
     * Notify of a disconnection.
     *
     * @param remoteURI the remote address that the client is calling from
     * @param localURI  the local address that the client is calling to
     */
    private void notifyDisconnection(URI remoteURI, URI localURI) {
        CallerListener listener = _listener;
        if (listener != null) {
            listener.disconnected(new CallerImpl(remoteURI, localURI));
        }
    }

    /**
     * Starts the reaper for dead/idle connections, if needed.
     */
    private synchronized void startReaper() {
        if (_daemon == null && _reapInterval > 0) {
            _daemon = new ClockDaemon();
            ThreadFactory creator =
                    new ThreadFactory(null, "ManagedConnectionReaper", false);
            _daemon.setThreadFactory(creator);
            _daemon.executePeriodically(_reapInterval, new Reaper(), false);
        }
    }

    /**
     * Stops the reaper for dead/idle connections, if needed.
     */
    private synchronized void stopReaper() {
        if (_daemon != null) {
            _daemon.shutDown();
            _daemon = null;
        }
    }

    /**
     * Helper class for reaping idle and dead connections.
     */
    private class Reaper implements Runnable {

        /**
         * The time that the reaper last ran, in milliseconds.
         */
        private long _lastReapTimestamp = 0;

        /**
         * Construct a new <code>Reaper</code>.
         */
        public Reaper() {
            _lastReapTimestamp = System.currentTimeMillis();
        }

        /**
         * Run the reaper.
         */
        public void run() {
            try {
                reapIdleConnections();
                if (!done()) {
                    reapDeadConnections();
                }
            } catch (Throwable exception) {
                _log.error(exception, exception);
            }
            _lastReapTimestamp = System.currentTimeMillis();
        }

        /**
         * Reap idle connections.
         */
        private void reapIdleConnections() {
            Map.Entry[] entries = (Map.Entry[]) _handles.entrySet().toArray(
                    new Map.Entry[0]);
            for (int i = 0; i < entries.length && !done(); ++i) {
                Map.Entry entry = entries[i];
                ManagedConnection connection =
                        (ManagedConnection) entry.getKey();
                PoolEntry pooled = (PoolEntry) _entries.get(connection);
                if (pooled != null && pooled.isInitialised()) {
                    ManagedConnectionHandle handle =
                            (ManagedConnectionHandle) entry.getValue();
                    if (handle.canDestroy() && idle(handle)) {
                        if (_log.isDebugEnabled()) {
                            try {
                                _log.debug("Reaping idle connection, URI="
                                           + connection.getRemoteURI()
                                           + ", local URI="
                                           + connection.getLocalURI());
                            } catch (ResourceException ignore) {
                                // do nothing
                            }
                        }
                        remove(connection);
                    }
                }
            }
        }

        /**
         * Determines if a connection is idle.
         *
         * @param handle the handle to the underlying managed connection
         * @return <code>true</code> if the underlying connection is idle,
         * otherwise <code>false</code>
         */
        private boolean idle(ManagedConnectionHandle handle) {
            boolean result = false;
            long timestamp = handle.getLastUsedTimestamp();
            if (timestamp == 0) {
                // connection not used yet. Update it, but don't reap it on this
                // invocation.
                handle.updateLastUsedTimestamp();
            } else if (timestamp < _lastReapTimestamp) {
                result = true;
            }
            return result;
        }

        /**
         * Reap dead connections.
         */
        private void reapDeadConnections() {
            PoolEntry[] entries = (PoolEntry[]) _entries.values().toArray(
                    new PoolEntry[0]);
            for (int i = 0; i < entries.length && !done(); ++i) {
                PoolEntry entry = entries[i];
                if (entry.isInitialised()) {
                    ManagedConnection connection = entry.getManagedConnection();
                    if (!connection.isAlive()) {
                        if (_log.isDebugEnabled()) {
                            try {
                                _log.debug("Reaping dead connection, URI="
                                           + connection.getRemoteURI()
                                           + ", local URI="
                                           + connection.getLocalURI());
                            } catch (ResourceException ignore) {
                                // do nothing
                            }
                        }
                        remove(connection);
                    }
                }
            }
        }

        /**
         * Determines if the reaper should terminate, by chaecking the interrupt
         * status of the current thread.
         *
         * @return <code>true</code> if the reaper should terminate
         */
        private boolean done() {
            return Thread.currentThread().isInterrupted();
        }
    }

}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?