📄 multithreadedhttpconnectionmanager.java
字号:
// remove the connection from the timeout handler idleConnectionHandler.remove(connection); } else if (LOG.isDebugEnabled()) { LOG.debug("There were no free connections to get, hostConfig=" + hostConfiguration); } return connection; } /** * Deletes all closed connections. */ public synchronized void deleteClosedConnections() { Iterator iter = freeConnections.iterator(); while (iter.hasNext()) { HttpConnection conn = (HttpConnection) iter.next(); if (!conn.isOpen()) { iter.remove(); deleteConnection(conn); } } } /** * Closes idle connections. * @param idleTimeout */ public synchronized void closeIdleConnections(long idleTimeout) { idleConnectionHandler.closeIdleConnections(idleTimeout); } /** * Deletes the given connection. This will remove all reference to the connection * so that it can be GCed. * * <p><b>Note:</b> Does not remove the connection from the freeConnections list. It * is assumed that the caller has already handled this case.</p> * * @param connection The connection to delete */ private synchronized void deleteConnection(HttpConnection connection) { HostConfiguration connectionConfiguration = configurationForConnection(connection); if (LOG.isDebugEnabled()) { LOG.debug("Reclaiming connection, hostConfig=" + connectionConfiguration); } connection.close(); HostConnectionPool hostPool = getHostPool(connectionConfiguration, true); hostPool.freeConnections.remove(connection); hostPool.numConnections--; numConnections--; if ((hostPool.numConnections == 0) && hostPool.waitingThreads.isEmpty()) { mapHosts.remove(connectionConfiguration); } // remove the connection from the timeout handler idleConnectionHandler.remove(connection); } /** * Close and delete an old, unused connection to make room for a new one. */ public synchronized void deleteLeastUsedConnection() { HttpConnection connection = (HttpConnection) freeConnections.removeFirst(); if (connection != null) { deleteConnection(connection); } else if (LOG.isDebugEnabled()) { LOG.debug("Attempted to reclaim an unused connection but there were none."); } } /** * Notifies a waiting thread that a connection for the given configuration is * available. * @param configuration the host config to use for notifying * @see #notifyWaitingThread(HostConnectionPool) */ public synchronized void notifyWaitingThread(HostConfiguration configuration) { notifyWaitingThread(getHostPool(configuration, true)); } /** * Notifies a waiting thread that a connection for the given configuration is * available. This will wake a thread waiting in this host pool or if there is not * one a thread in the connection pool will be notified. * * @param hostPool the host pool to use for notifying */ public synchronized void notifyWaitingThread(HostConnectionPool hostPool) { // find the thread we are going to notify, we want to ensure that each // waiting thread is only interrupted once so we will remove it from // all wait queues before interrupting it WaitingThread waitingThread = null; if (hostPool.waitingThreads.size() > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Notifying thread waiting on host pool, hostConfig=" + hostPool.hostConfiguration); } waitingThread = (WaitingThread) hostPool.waitingThreads.removeFirst(); waitingThreads.remove(waitingThread); } else if (waitingThreads.size() > 0) { if (LOG.isDebugEnabled()) { LOG.debug("No-one waiting on host pool, notifying next waiting thread."); } waitingThread = (WaitingThread) waitingThreads.removeFirst(); waitingThread.hostConnectionPool.waitingThreads.remove(waitingThread); } else if (LOG.isDebugEnabled()) { LOG.debug("Notifying no-one, there are no waiting threads"); } if (waitingThread != null) { waitingThread.interruptedByConnectionPool = true; waitingThread.thread.interrupt(); } } /** * Marks the given connection as free. * @param conn a connection that is no longer being used */ public void freeConnection(HttpConnection conn) { HostConfiguration connectionConfiguration = configurationForConnection(conn); if (LOG.isDebugEnabled()) { LOG.debug("Freeing connection, hostConfig=" + connectionConfiguration); } synchronized (this) { if (shutdown) { // the connection manager has been shutdown, release the connection's // resources and get out of here conn.close(); return; } HostConnectionPool hostPool = getHostPool(connectionConfiguration, true); // Put the connect back in the available list and notify a waiter hostPool.freeConnections.add(conn); if (hostPool.numConnections == 0) { // for some reason this connection pool didn't already exist LOG.error("Host connection pool not found, hostConfig=" + connectionConfiguration); hostPool.numConnections = 1; } freeConnections.add(conn); // we can remove the reference to this connection as we have control over // it again. this also ensures that the connection manager can be GCed removeReferenceToConnection((HttpConnectionWithReference) conn); if (numConnections == 0) { // for some reason this connection pool didn't already exist LOG.error("Host connection pool not found, hostConfig=" + connectionConfiguration); numConnections = 1; } // register the connection with the timeout handler idleConnectionHandler.add(conn); notifyWaitingThread(hostPool); } } } /** * A simple struct-like class to combine the objects needed to release a connection's * resources when claimed by the garbage collector. */ private static class ConnectionSource { /** The connection pool that created the connection */ public ConnectionPool connectionPool; /** The connection's host configuration */ public HostConfiguration hostConfiguration; } /** * A simple struct-like class to combine the connection list and the count * of created connections. */ private static class HostConnectionPool { /** The hostConfig this pool is for */ public HostConfiguration hostConfiguration; /** The list of free connections */ public LinkedList freeConnections = new LinkedList(); /** The list of WaitingThreads for this host */ public LinkedList waitingThreads = new LinkedList(); /** The number of created connections */ public int numConnections = 0; } /** * A simple struct-like class to combine the waiting thread and the connection * pool it is waiting on. */ private static class WaitingThread { /** The thread that is waiting for a connection */ public Thread thread; /** The connection pool the thread is waiting for */ public HostConnectionPool hostConnectionPool; /** Flag to indicate if the thread was interrupted by the ConnectionPool. Set * to true inside {@link ConnectionPool#notifyWaitingThread(HostConnectionPool)} * before the thread is interrupted. */ public boolean interruptedByConnectionPool = false; } /** * A thread for listening for HttpConnections reclaimed by the garbage * collector. */ private static class ReferenceQueueThread extends Thread { private volatile boolean shutdown = false; /** * Create an instance and make this a daemon thread. */ public ReferenceQueueThread() { setDaemon(true); setName("MultiThreadedHttpConnectionManager cleanup"); } public void shutdown() { this.shutdown = true; this.interrupt(); } /** * Handles cleaning up for the given connection reference. * * @param ref the reference to clean up */ private void handleReference(Reference ref) { ConnectionSource source = null; synchronized (REFERENCE_TO_CONNECTION_SOURCE) { source = (ConnectionSource) REFERENCE_TO_CONNECTION_SOURCE.remove(ref); } // only clean up for this reference if it is still associated with // a ConnectionSource if (source != null) { if (LOG.isDebugEnabled()) { LOG.debug( "Connection reclaimed by garbage collector, hostConfig=" + source.hostConfiguration); } source.connectionPool.handleLostConnection(source.hostConfiguration); } } /** * Start execution. */ public void run() { while (!shutdown) { try { // remove the next reference and process it Reference ref = REFERENCE_QUEUE.remove(); if (ref != null) { handleReference(ref); } } catch (InterruptedException e) { LOG.debug("ReferenceQueueThread interrupted", e); } } } } /** * A connection that keeps a reference to itself. */ private static class HttpConnectionWithReference extends HttpConnection { public WeakReference reference = new WeakReference(this, REFERENCE_QUEUE); /** * @param hostConfiguration */ public HttpConnectionWithReference(HostConfiguration hostConfiguration) { super(hostConfiguration); } } /** * An HttpConnection wrapper that ensures a connection cannot be used * once released. */ private static class HttpConnectionAdapter extends HttpConnection { // the wrapped connection private HttpConnection wrappedConnection; /** * Creates a new HttpConnectionAdapter. * @param connection the connection to be wrapped */ public HttpConnectionAdapter(HttpConnection connection) { super(connection.getHost(), connection.getPort(), connection.getProtocol()); this.wrappedConnection = connection; } /** * Tests if the wrapped connection is still available. * @return boolean */ protected boolean hasConnection() { return wrappedConnection != null; } /** * @return HttpConnection */ HttpConnection getWrappedConnection() { return wrappedConnection; } public void close() { if (hasConnection()) { wrappedConnection.close(); } else { // do nothing } } public InetAddress getLocalAddress() { if (hasConnection()) { return wrappedConnection.getLocalAddress(); } else { return null; } } /** * @deprecated */ public boolean isStaleCheckingEnabled() { if (hasConnection()) { return wrappedConnection.isStaleCheckingEnabled(); } else { return false; } } public void setLocalAddress(InetAddress localAddress) { if (hasConnection()) { wrappedConnection.setLocalAddress(localAddress); } else { throw new IllegalStateException("Connection has been released"); } } /** * @deprecated */ public void setStaleCheckingEnabled(boolean staleCheckEnabled) { if (hasConnection()) { wrappedConnection.setStaleCheckingEnabled(staleCheckEnabled); } else { throw new IllegalStateException("Connection has been released"); } } public String getHost() { if (hasConnection()) { return wrappedConnection.getHost(); } else { return null; } } public HttpConnectionManager getHttpConnectionManager() { if (hasConnection()) { return wrappedConnection.getHttpConnectionManager(); } else { return null; } } public InputStream getLastResponseInputStream() { if (hasConnection()) { return wrappedConnection.getLastResponseInputStream(); } else { return null; } } public int getPort() { if (hasConnection()) { return wrappedConnection.getPort(); } else { return -1; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -